mirror of https://github.com/apache/lucene.git
SOLR-12392: Fix waitForElapsed logic and state restoration. Enable the test.
This commit is contained in:
parent
6ca0c5f98a
commit
d27a2e8996
|
@ -73,7 +73,8 @@ public class IndexSizeTrigger extends TriggerBase {
|
||||||
private long aboveBytes, aboveDocs, belowBytes, belowDocs;
|
private long aboveBytes, aboveDocs, belowBytes, belowDocs;
|
||||||
private CollectionParams.CollectionAction aboveOp, belowOp;
|
private CollectionParams.CollectionAction aboveOp, belowOp;
|
||||||
private final Set<String> collections = new HashSet<>();
|
private final Set<String> collections = new HashSet<>();
|
||||||
private final Map<String, Long> lastEventMap = new ConcurrentHashMap<>();
|
private final Map<String, Long> lastAboveEventMap = new ConcurrentHashMap<>();
|
||||||
|
private final Map<String, Long> lastBelowEventMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public IndexSizeTrigger(String name) {
|
public IndexSizeTrigger(String name) {
|
||||||
super(TriggerEventType.INDEXSIZE, name);
|
super(TriggerEventType.INDEXSIZE, name);
|
||||||
|
@ -154,16 +155,22 @@ public class IndexSizeTrigger extends TriggerBase {
|
||||||
@Override
|
@Override
|
||||||
protected Map<String, Object> getState() {
|
protected Map<String, Object> getState() {
|
||||||
Map<String, Object> state = new HashMap<>();
|
Map<String, Object> state = new HashMap<>();
|
||||||
state.put("lastEventMap", lastEventMap);
|
state.put("lastAboveEventMap", lastAboveEventMap);
|
||||||
|
state.put("lastBelowEventMap", lastBelowEventMap);
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setState(Map<String, Object> state) {
|
protected void setState(Map<String, Object> state) {
|
||||||
this.lastEventMap.clear();
|
this.lastAboveEventMap.clear();
|
||||||
Map<String, Long> replicaVsTime = (Map<String, Long>)state.get("lastEventMap");
|
this.lastBelowEventMap.clear();
|
||||||
|
Map<String, Long> replicaVsTime = (Map<String, Long>)state.get("lastAboveEventMap");
|
||||||
if (replicaVsTime != null) {
|
if (replicaVsTime != null) {
|
||||||
this.lastEventMap.putAll(replicaVsTime);
|
this.lastAboveEventMap.putAll(replicaVsTime);
|
||||||
|
}
|
||||||
|
replicaVsTime = (Map<String, Long>)state.get("lastBelowEventMap");
|
||||||
|
if (replicaVsTime != null) {
|
||||||
|
this.lastBelowEventMap.putAll(replicaVsTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,6 +178,12 @@ public class IndexSizeTrigger extends TriggerBase {
|
||||||
public void restoreState(AutoScaling.Trigger old) {
|
public void restoreState(AutoScaling.Trigger old) {
|
||||||
assert old.isClosed();
|
assert old.isClosed();
|
||||||
if (old instanceof IndexSizeTrigger) {
|
if (old instanceof IndexSizeTrigger) {
|
||||||
|
IndexSizeTrigger that = (IndexSizeTrigger)old;
|
||||||
|
assert this.name.equals(that.name);
|
||||||
|
this.lastAboveEventMap.clear();
|
||||||
|
this.lastBelowEventMap.clear();
|
||||||
|
this.lastAboveEventMap.putAll(that.lastAboveEventMap);
|
||||||
|
this.lastBelowEventMap.putAll(that.lastBelowEventMap);
|
||||||
} else {
|
} else {
|
||||||
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
|
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
|
||||||
"Unable to restore state from an unknown type of trigger");
|
"Unable to restore state from an unknown type of trigger");
|
||||||
|
@ -275,44 +288,54 @@ public class IndexSizeTrigger extends TriggerBase {
|
||||||
|
|
||||||
// collection / list(info)
|
// collection / list(info)
|
||||||
Map<String, List<ReplicaInfo>> aboveSize = new HashMap<>();
|
Map<String, List<ReplicaInfo>> aboveSize = new HashMap<>();
|
||||||
currentSizes.entrySet().stream()
|
|
||||||
.filter(e -> (
|
currentSizes.forEach((coreName, info) -> {
|
||||||
(Long)e.getValue().getVariable(BYTES_SIZE_PROP) > aboveBytes ||
|
if ((Long)info.getVariable(BYTES_SIZE_PROP) > aboveBytes ||
|
||||||
(Long)e.getValue().getVariable(DOCS_SIZE_PROP) > aboveDocs
|
(Long)info.getVariable(DOCS_SIZE_PROP) > aboveDocs) {
|
||||||
) && waitForElapsed(e.getKey(), now, lastEventMap))
|
if (waitForElapsed(coreName, now, lastAboveEventMap)) {
|
||||||
.forEach(e -> {
|
|
||||||
ReplicaInfo info = e.getValue();
|
|
||||||
List<ReplicaInfo> infos = aboveSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
|
List<ReplicaInfo> infos = aboveSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
|
||||||
if (!infos.contains(info)) {
|
if (!infos.contains(info)) {
|
||||||
if ((Long)e.getValue().getVariable(BYTES_SIZE_PROP) > aboveBytes) {
|
if ((Long)info.getVariable(BYTES_SIZE_PROP) > aboveBytes) {
|
||||||
info.getVariables().put(VIOLATION_PROP, ABOVE_BYTES_PROP);
|
info.getVariables().put(VIOLATION_PROP, ABOVE_BYTES_PROP);
|
||||||
} else {
|
} else {
|
||||||
info.getVariables().put(VIOLATION_PROP, ABOVE_DOCS_PROP);
|
info.getVariables().put(VIOLATION_PROP, ABOVE_DOCS_PROP);
|
||||||
}
|
}
|
||||||
infos.add(info);
|
infos.add(info);
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
} else {
|
||||||
|
// no violation - clear waitForElapsed
|
||||||
|
lastAboveEventMap.remove(coreName);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// collection / list(info)
|
// collection / list(info)
|
||||||
Map<String, List<ReplicaInfo>> belowSize = new HashMap<>();
|
Map<String, List<ReplicaInfo>> belowSize = new HashMap<>();
|
||||||
currentSizes.entrySet().stream()
|
|
||||||
.filter(e -> (
|
currentSizes.forEach((coreName, info) -> {
|
||||||
(Long)e.getValue().getVariable(BYTES_SIZE_PROP) < belowBytes ||
|
if ((Long)info.getVariable(BYTES_SIZE_PROP) < belowBytes ||
|
||||||
(Long)e.getValue().getVariable(DOCS_SIZE_PROP) < belowDocs
|
(Long)info.getVariable(DOCS_SIZE_PROP) < belowDocs) {
|
||||||
) && waitForElapsed(e.getKey(), now, lastEventMap))
|
if (waitForElapsed(coreName, now, lastBelowEventMap)) {
|
||||||
.forEach(e -> {
|
|
||||||
ReplicaInfo info = e.getValue();
|
|
||||||
List<ReplicaInfo> infos = belowSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
|
List<ReplicaInfo> infos = belowSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
|
||||||
if (!infos.contains(info)) {
|
if (!infos.contains(info)) {
|
||||||
if ((Long)e.getValue().getVariable(BYTES_SIZE_PROP) < belowBytes) {
|
if ((Long)info.getVariable(BYTES_SIZE_PROP) < belowBytes) {
|
||||||
info.getVariables().put(VIOLATION_PROP, BELOW_BYTES_PROP);
|
info.getVariables().put(VIOLATION_PROP, BELOW_BYTES_PROP);
|
||||||
} else {
|
} else {
|
||||||
info.getVariables().put(VIOLATION_PROP, BELOW_DOCS_PROP);
|
info.getVariables().put(VIOLATION_PROP, BELOW_DOCS_PROP);
|
||||||
}
|
}
|
||||||
infos.add(info);
|
infos.add(info);
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
} else {
|
||||||
|
// no violation - clear waitForElapsed
|
||||||
|
lastBelowEventMap.remove(coreName);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if (aboveSize.isEmpty() && belowSize.isEmpty()) {
|
if (aboveSize.isEmpty() && belowSize.isEmpty()) {
|
||||||
|
log.trace("NO VIOLATIONS: Now={}", now);
|
||||||
|
log.trace("lastAbove={}", lastAboveEventMap);
|
||||||
|
log.trace("lastBelow={}", lastBelowEventMap);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -326,7 +349,7 @@ public class IndexSizeTrigger extends TriggerBase {
|
||||||
TriggerEvent.Op op = new TriggerEvent.Op(aboveOp);
|
TriggerEvent.Op op = new TriggerEvent.Op(aboveOp);
|
||||||
op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(coll, r.getShard()));
|
op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(coll, r.getShard()));
|
||||||
ops.add(op);
|
ops.add(op);
|
||||||
Long time = lastEventMap.get(r.getCore());
|
Long time = lastAboveEventMap.get(r.getCore());
|
||||||
if (time != null && eventTime.get() > time) {
|
if (time != null && eventTime.get() > time) {
|
||||||
eventTime.set(time);
|
eventTime.set(time);
|
||||||
}
|
}
|
||||||
|
@ -360,11 +383,11 @@ public class IndexSizeTrigger extends TriggerBase {
|
||||||
op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(0).getShard()));
|
op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(0).getShard()));
|
||||||
op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(1).getShard()));
|
op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(1).getShard()));
|
||||||
ops.add(op);
|
ops.add(op);
|
||||||
Long time = lastEventMap.get(replicas.get(0).getCore());
|
Long time = lastBelowEventMap.get(replicas.get(0).getCore());
|
||||||
if (time != null && eventTime.get() > time) {
|
if (time != null && eventTime.get() > time) {
|
||||||
eventTime.set(time);
|
eventTime.set(time);
|
||||||
}
|
}
|
||||||
time = lastEventMap.get(replicas.get(1).getCore());
|
time = lastBelowEventMap.get(replicas.get(1).getCore());
|
||||||
if (time != null && eventTime.get() > time) {
|
if (time != null && eventTime.get() > time) {
|
||||||
eventTime.set(time);
|
eventTime.set(time);
|
||||||
}
|
}
|
||||||
|
@ -376,11 +399,11 @@ public class IndexSizeTrigger extends TriggerBase {
|
||||||
if (processor.process(new IndexSizeEvent(getName(), eventTime.get(), ops, aboveSize, belowSize))) {
|
if (processor.process(new IndexSizeEvent(getName(), eventTime.get(), ops, aboveSize, belowSize))) {
|
||||||
// update last event times
|
// update last event times
|
||||||
aboveSize.forEach((coll, replicas) -> {
|
aboveSize.forEach((coll, replicas) -> {
|
||||||
replicas.forEach(r -> lastEventMap.put(r.getCore(), now));
|
replicas.forEach(r -> lastAboveEventMap.put(r.getCore(), now));
|
||||||
});
|
});
|
||||||
belowSize.forEach((coll, replicas) -> {
|
belowSize.forEach((coll, replicas) -> {
|
||||||
lastEventMap.put(replicas.get(0).getCore(), now);
|
lastBelowEventMap.put(replicas.get(0).getCore(), now);
|
||||||
lastEventMap.put(replicas.get(1).getCore(), now);
|
lastBelowEventMap.put(replicas.get(1).getCore(), now);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,7 +136,6 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 21-May-2018
|
|
||||||
public void testTrigger() throws Exception {
|
public void testTrigger() throws Exception {
|
||||||
String collectionName = "testTrigger_collection";
|
String collectionName = "testTrigger_collection";
|
||||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
||||||
|
@ -234,7 +233,6 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 21-May-2018
|
|
||||||
public void testSplitIntegration() throws Exception {
|
public void testSplitIntegration() throws Exception {
|
||||||
String collectionName = "testSplitIntegration_collection";
|
String collectionName = "testSplitIntegration_collection";
|
||||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
||||||
|
@ -347,7 +345,6 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 21-May-2018
|
|
||||||
public void testMergeIntegration() throws Exception {
|
public void testMergeIntegration() throws Exception {
|
||||||
String collectionName = "testMergeIntegration_collection";
|
String collectionName = "testMergeIntegration_collection";
|
||||||
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
|
||||||
|
|
Loading…
Reference in New Issue