mirror of https://github.com/apache/lucene.git
SOLR-12392: Fix several bugs in tests and in trigger event serialization.
Add better support for converting MapWriter instances to JSON.
This commit is contained in:
parent
f762953aab
commit
8d1dce933f
|
@ -219,7 +219,7 @@ public class ComputePlanAction extends TriggerActionBase {
|
|||
TriggerEvent.Op op = ops.get(start);
|
||||
suggester = session.getSuggester(op.getAction());
|
||||
if (suggester instanceof UnsupportedSuggester) {
|
||||
List<TriggerEvent.Op> unsupportedOps = (List<TriggerEvent.Op>)context.getProperties().computeIfAbsent("unsupportedOps", k -> new ArrayList<TriggerEvent.Op>());
|
||||
List<TriggerEvent.Op> unsupportedOps = (List<TriggerEvent.Op>)context.getProperties().computeIfAbsent(TriggerEvent.UNSUPPORTED_OPS, k -> new ArrayList<TriggerEvent.Op>());
|
||||
unsupportedOps.add(op);
|
||||
}
|
||||
for (Map.Entry<Suggester.Hint, Object> e : op.getHints().entrySet()) {
|
||||
|
|
|
@ -21,7 +21,8 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -29,6 +30,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
|||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.util.IdUtils;
|
||||
|
||||
|
@ -42,8 +44,9 @@ public class TriggerEvent implements MapWriter {
|
|||
public static final String NODE_NAMES = "nodeNames";
|
||||
public static final String EVENT_TIMES = "eventTimes";
|
||||
public static final String REQUESTED_OPS = "requestedOps";
|
||||
public static final String UNSUPPORTED_OPS = "unsupportedOps";
|
||||
|
||||
public static final class Op {
|
||||
public static final class Op implements MapWriter {
|
||||
private final CollectionParams.CollectionAction action;
|
||||
private final EnumMap<Suggester.Hint, Object> hints = new EnumMap<>(Suggester.Hint.class);
|
||||
|
||||
|
@ -60,7 +63,7 @@ public class TriggerEvent implements MapWriter {
|
|||
hint.validator.accept(value);
|
||||
if (hint.multiValued) {
|
||||
Collection<?> values = value instanceof Collection ? (Collection) value : Collections.singletonList(value);
|
||||
((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).addAll(values);
|
||||
((Set) hints.computeIfAbsent(hint, h -> new LinkedHashSet<>())).addAll(values);
|
||||
} else {
|
||||
hints.put(hint, value == null ? null : String.valueOf(value));
|
||||
}
|
||||
|
@ -74,6 +77,50 @@ public class TriggerEvent implements MapWriter {
|
|||
return hints;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMap(EntryWriter ew) throws IOException {
|
||||
ew.put("action", action);
|
||||
ew.put("hints", hints);
|
||||
}
|
||||
|
||||
public static Op fromMap(Map<String, Object> map) {
|
||||
if (!map.containsKey("action")) {
|
||||
return null;
|
||||
}
|
||||
CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(String.valueOf(map.get("action")));
|
||||
if (action == null) {
|
||||
return null;
|
||||
}
|
||||
Op op = new Op(action);
|
||||
Map<Object, Object> hints = (Map<Object, Object>)map.get("hints");
|
||||
if (hints != null && !hints.isEmpty()) {
|
||||
hints.forEach((k, v) -> {
|
||||
Suggester.Hint h = Suggester.Hint.get(k.toString());
|
||||
if (h == null) {
|
||||
return;
|
||||
}
|
||||
if (!(v instanceof Collection)) {
|
||||
v = Collections.singletonList(v);
|
||||
}
|
||||
((Collection)v).forEach(vv -> {
|
||||
if (vv instanceof Map) {
|
||||
// maybe it's a Pair?
|
||||
Map<String, Object> m = (Map<String, Object>)vv;
|
||||
if (m.containsKey("first") && m.containsKey("second")) {
|
||||
Pair p = Pair.parse(m);
|
||||
if (p != null) {
|
||||
op.addHint(h, p);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
op.addHint(h, vv);
|
||||
});
|
||||
});
|
||||
}
|
||||
return op;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Op{" +
|
||||
|
@ -231,4 +278,32 @@ public class TriggerEvent implements MapWriter {
|
|||
public String toString() {
|
||||
return Utils.toJSONString(this);
|
||||
}
|
||||
|
||||
public static TriggerEvent fromMap(Map<String, Object> map) {
|
||||
String id = (String)map.get("id");
|
||||
String source = (String)map.get("source");
|
||||
long eventTime = ((Number)map.get("eventTime")).longValue();
|
||||
TriggerEventType eventType = TriggerEventType.valueOf((String)map.get("eventType"));
|
||||
Map<String, Object> properties = (Map<String, Object>)map.get("properties");
|
||||
// properly deserialize some well-known complex properties
|
||||
fixOps(TriggerEvent.REQUESTED_OPS, properties);
|
||||
fixOps(TriggerEvent.UNSUPPORTED_OPS, properties);
|
||||
TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties);
|
||||
return res;
|
||||
}
|
||||
|
||||
public static void fixOps(String type, Map<String, Object> properties) {
|
||||
List<Object> ops = (List<Object>)properties.get(type);
|
||||
if (ops != null && !ops.isEmpty()) {
|
||||
for (int i = 0; i < ops.size(); i++) {
|
||||
Object o = ops.get(i);
|
||||
if (o instanceof Map) {
|
||||
TriggerEvent.Op op = TriggerEvent.Op.fromMap((Map)o);
|
||||
if (op != null) {
|
||||
ops.set(i, op);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.Map;
|
|||
|
||||
import org.apache.solr.client.solrj.cloud.DistributedQueue;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
|
||||
import org.apache.solr.cloud.Stats;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
|
@ -108,12 +107,7 @@ public class TriggerEventQueue {
|
|||
}
|
||||
|
||||
private TriggerEvent fromMap(Map<String, Object> map) {
|
||||
String id = (String)map.get("id");
|
||||
String source = (String)map.get("source");
|
||||
long eventTime = ((Number)map.get("eventTime")).longValue();
|
||||
TriggerEventType eventType = TriggerEventType.valueOf((String)map.get("eventType"));
|
||||
Map<String, Object> properties = (Map<String, Object>)map.get("properties");
|
||||
TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties);
|
||||
TriggerEvent res = TriggerEvent.fromMap(map);
|
||||
res.getProperties().put(DEQUEUE_TIME, timeSource.getTimeNs());
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -39,6 +39,8 @@ public class CapturedEvent {
|
|||
TriggerEvent event, String message) {
|
||||
if (context != null) {
|
||||
context.toMap(this.context);
|
||||
TriggerEvent.fixOps("properties." + TriggerEvent.REQUESTED_OPS, this.context);
|
||||
TriggerEvent.fixOps("properties." + TriggerEvent.UNSUPPORTED_OPS, this.context);
|
||||
}
|
||||
this.config = config;
|
||||
this.stage = stage;
|
||||
|
|
|
@ -93,7 +93,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
configureCluster(2)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
if (random().nextBoolean()) {
|
||||
if (random().nextBoolean() || true) {
|
||||
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
|
||||
solrClient = cluster.getSolrClient();
|
||||
loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
|
||||
|
@ -268,7 +268,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
String setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'capturing'," +
|
||||
"'name' : 'capturing2'," +
|
||||
"'trigger' : 'index_size_trigger2'," +
|
||||
"'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
|
||||
"'beforeAction' : ['compute_plan','execute_plan']," +
|
||||
|
@ -316,8 +316,8 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
assertTrue("did not finish processing in time", await);
|
||||
CloudTestUtils.waitForState(cloudManager, collectionName, 20, TimeUnit.SECONDS, CloudTestUtils.clusterShape(6, 2, true, true));
|
||||
assertEquals(1, listenerEvents.size());
|
||||
List<CapturedEvent> events = listenerEvents.get("capturing");
|
||||
assertNotNull("'capturing' events not found", events);
|
||||
List<CapturedEvent> events = listenerEvents.get("capturing2");
|
||||
assertNotNull("'capturing2' events not found", events);
|
||||
assertEquals("events: " + events, 6, events.size());
|
||||
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
|
||||
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
|
||||
|
@ -386,7 +386,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
String setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'capturing'," +
|
||||
"'name' : 'capturing3'," +
|
||||
"'trigger' : 'index_size_trigger3'," +
|
||||
"'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
|
||||
"'beforeAction' : ['compute_plan','execute_plan']," +
|
||||
|
@ -432,8 +432,8 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("did not finish processing in time", await);
|
||||
assertEquals(1, listenerEvents.size());
|
||||
List<CapturedEvent> events = listenerEvents.get("capturing");
|
||||
assertNotNull("'capturing' events not found", events);
|
||||
List<CapturedEvent> events = listenerEvents.get("capturing3");
|
||||
assertNotNull("'capturing3' events not found", events);
|
||||
assertEquals("events: " + events, 6, events.size());
|
||||
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
|
||||
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
|
||||
|
@ -531,7 +531,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
String setListenerCommand = "{" +
|
||||
"'set-listener' : " +
|
||||
"{" +
|
||||
"'name' : 'capturing'," +
|
||||
"'name' : 'capturing4'," +
|
||||
"'trigger' : 'index_size_trigger4'," +
|
||||
"'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
|
||||
"'beforeAction' : ['compute_plan','execute_plan']," +
|
||||
|
@ -571,8 +571,8 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("did not finish processing in time", await);
|
||||
assertEquals(1, listenerEvents.size());
|
||||
List<CapturedEvent> events = listenerEvents.get("capturing");
|
||||
assertNotNull("'capturing' events not found", events);
|
||||
List<CapturedEvent> events = listenerEvents.get("capturing4");
|
||||
assertNotNull("'capturing4' events not found", events);
|
||||
assertEquals("events: " + events, 6, events.size());
|
||||
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
|
||||
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
|
||||
|
@ -651,8 +651,8 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
|
|||
await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("did not finish processing in time", await);
|
||||
assertEquals(1, listenerEvents.size());
|
||||
events = listenerEvents.get("capturing");
|
||||
assertNotNull("'capturing' events not found", events);
|
||||
events = listenerEvents.get("capturing4");
|
||||
assertNotNull("'capturing4' events not found", events);
|
||||
assertEquals("events: " + events, 6, events.size());
|
||||
assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
|
||||
assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
|
||||
import org.apache.solr.cloud.autoscaling.sim.GenericDistributedQueueFactory;
|
||||
import org.apache.solr.cloud.autoscaling.sim.SimDistribStateManager;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class TriggerEventQueueTest extends SolrTestCaseJ4 {
|
||||
|
||||
SolrCloudManager cloudManager;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
assumeWorkingMockito();
|
||||
cloudManager = mock(SolrCloudManager.class);
|
||||
DistribStateManager stateManager = new SimDistribStateManager();
|
||||
when(cloudManager.getDistribStateManager()).thenReturn(stateManager);
|
||||
DistributedQueueFactory queueFactory = new GenericDistributedQueueFactory(stateManager);
|
||||
when(cloudManager.getDistributedQueueFactory()).thenReturn(queueFactory);
|
||||
when(cloudManager.getTimeSource()).thenReturn(TimeSource.NANO_TIME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerialization() throws Exception {
|
||||
TriggerEventQueue queue = new TriggerEventQueue(cloudManager, "test", null);
|
||||
Map<String, Number> hotHosts = new HashMap<>();
|
||||
hotHosts.put("host1", 1);
|
||||
hotHosts.put("host2", 1);
|
||||
TriggerEvent ev = new MetricTrigger.MetricBreachedEvent("testTrigger", "testCollection", "shard1",
|
||||
CollectionParams.CollectionAction.ADDREPLICA.toLower(), cloudManager.getTimeSource().getTimeNs(),
|
||||
"foo", hotHosts);
|
||||
queue.offerEvent(ev);
|
||||
ev = queue.pollEvent();
|
||||
assertNotNull(ev);
|
||||
Object ops = ev.getProperties().get(TriggerEvent.REQUESTED_OPS);
|
||||
assertNotNull(ops);
|
||||
assertTrue(ops.getClass().getName(), ops instanceof List);
|
||||
List<Object> requestedOps = (List<Object>)ops;
|
||||
assertEquals(requestedOps.toString(), 2, requestedOps.size());
|
||||
requestedOps.forEach(op -> {
|
||||
assertTrue(op.getClass().getName(), op instanceof TriggerEvent.Op);
|
||||
TriggerEvent.Op operation = (TriggerEvent.Op)op;
|
||||
assertEquals(op.toString(), CollectionParams.CollectionAction.ADDREPLICA, operation.getAction());
|
||||
EnumMap<Suggester.Hint, Object> hints = ((TriggerEvent.Op) op).getHints();
|
||||
assertEquals(hints.toString(), 2, hints.size());
|
||||
Object o = hints.get(Suggester.Hint.COLL_SHARD);
|
||||
assertNotNull(Suggester.Hint.COLL_SHARD.toString(), o);
|
||||
assertTrue(o.getClass().getName(), o instanceof Collection);
|
||||
Collection<Object> col = (Collection<Object>)o;
|
||||
assertEquals(col.toString(), 1, col.size());
|
||||
o = col.iterator().next();
|
||||
assertTrue(o.getClass().getName(), o instanceof Pair);
|
||||
o = hints.get(Suggester.Hint.SRC_NODE);
|
||||
assertNotNull(Suggester.Hint.SRC_NODE.toString(), o);
|
||||
assertTrue(o.getClass().getName(), o instanceof Collection);
|
||||
col = (Collection<Object>)o;
|
||||
assertEquals(col.toString(), 1, col.size());
|
||||
o = col.iterator().next();
|
||||
assertTrue(o.getClass().getName(), o instanceof String);
|
||||
});
|
||||
}
|
||||
}
|
|
@ -691,7 +691,8 @@ public class TestSimLargeCluster extends SimSolrCloudTestCase {
|
|||
boolean await = triggerFinishedLatch.await(waitForSeconds * 20000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
assertTrue("The trigger did not fire at all", await);
|
||||
// wait for listener to capture the SUCCEEDED stage
|
||||
cluster.getTimeSource().sleep(2000);
|
||||
cluster.getTimeSource().sleep(5000);
|
||||
assertNotNull(listenerEvents.toString(), listenerEvents.get("srt"));
|
||||
assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
|
||||
CapturedEvent ev = listenerEvents.get("srt").get(0);
|
||||
assertEquals(TriggerEventType.SEARCHRATE, ev.event.getEventType());
|
||||
|
|
|
@ -155,6 +155,22 @@ public class Utils {
|
|||
return writer;
|
||||
}
|
||||
|
||||
private static class MapWriterJSONWriter extends JSONWriter {
|
||||
|
||||
public MapWriterJSONWriter(CharArr out, int indentSize) {
|
||||
super(out, indentSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleUnknownClass(Object o) {
|
||||
if (o instanceof MapWriter) {
|
||||
Map m = ((MapWriter)o).toMap(new LinkedHashMap<>());
|
||||
write(m);
|
||||
} else {
|
||||
super.handleUnknownClass(o);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static byte[] toJSON(Object o) {
|
||||
if(o == null) return new byte[0];
|
||||
|
@ -166,7 +182,7 @@ public class Utils {
|
|||
o = ((IteratorWriter)o).toList(new ArrayList<>());
|
||||
}
|
||||
}
|
||||
new JSONWriter(out, 2).write(o); // indentation by default
|
||||
new MapWriterJSONWriter(out, 2).write(o); // indentation by default
|
||||
return toUTF8(out);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue