diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java index 4cb15ead875..923a27a2a87 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java @@ -219,7 +219,7 @@ public class ComputePlanAction extends TriggerActionBase { TriggerEvent.Op op = ops.get(start); suggester = session.getSuggester(op.getAction()); if (suggester instanceof UnsupportedSuggester) { - List unsupportedOps = (List)context.getProperties().computeIfAbsent("unsupportedOps", k -> new ArrayList()); + List unsupportedOps = (List)context.getProperties().computeIfAbsent(TriggerEvent.UNSUPPORTED_OPS, k -> new ArrayList()); unsupportedOps.add(op); } for (Map.Entry e : op.getHints().entrySet()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java index 907309dce8f..8e3a3488b5f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java @@ -21,7 +21,8 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -29,6 +30,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.common.MapWriter; import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Utils; import org.apache.solr.util.IdUtils; @@ -42,8 +44,9 @@ public class TriggerEvent implements MapWriter { public static final String NODE_NAMES = "nodeNames"; public static final String EVENT_TIMES = "eventTimes"; public static final String REQUESTED_OPS = "requestedOps"; + public static final String UNSUPPORTED_OPS = "unsupportedOps"; - public static final class Op { + public static final class Op implements MapWriter { private final CollectionParams.CollectionAction action; private final EnumMap hints = new EnumMap<>(Suggester.Hint.class); @@ -60,7 +63,7 @@ public class TriggerEvent implements MapWriter { hint.validator.accept(value); if (hint.multiValued) { Collection values = value instanceof Collection ? (Collection) value : Collections.singletonList(value); - ((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).addAll(values); + ((Set) hints.computeIfAbsent(hint, h -> new LinkedHashSet<>())).addAll(values); } else { hints.put(hint, value == null ? null : String.valueOf(value)); } @@ -74,6 +77,50 @@ public class TriggerEvent implements MapWriter { return hints; } + @Override + public void writeMap(EntryWriter ew) throws IOException { + ew.put("action", action); + ew.put("hints", hints); + } + + public static Op fromMap(Map map) { + if (!map.containsKey("action")) { + return null; + } + CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(String.valueOf(map.get("action"))); + if (action == null) { + return null; + } + Op op = new Op(action); + Map hints = (Map)map.get("hints"); + if (hints != null && !hints.isEmpty()) { + hints.forEach((k, v) -> { + Suggester.Hint h = Suggester.Hint.get(k.toString()); + if (h == null) { + return; + } + if (!(v instanceof Collection)) { + v = Collections.singletonList(v); + } + ((Collection)v).forEach(vv -> { + if (vv instanceof Map) { + // maybe it's a Pair? + Map m = (Map)vv; + if (m.containsKey("first") && m.containsKey("second")) { + Pair p = Pair.parse(m); + if (p != null) { + op.addHint(h, p); + return; + } + } + } + op.addHint(h, vv); + }); + }); + } + return op; + } + @Override public String toString() { return "Op{" + @@ -231,4 +278,32 @@ public class TriggerEvent implements MapWriter { public String toString() { return Utils.toJSONString(this); } + + public static TriggerEvent fromMap(Map map) { + String id = (String)map.get("id"); + String source = (String)map.get("source"); + long eventTime = ((Number)map.get("eventTime")).longValue(); + TriggerEventType eventType = TriggerEventType.valueOf((String)map.get("eventType")); + Map properties = (Map)map.get("properties"); + // properly deserialize some well-known complex properties + fixOps(TriggerEvent.REQUESTED_OPS, properties); + fixOps(TriggerEvent.UNSUPPORTED_OPS, properties); + TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties); + return res; + } + + public static void fixOps(String type, Map properties) { + List ops = (List)properties.get(type); + if (ops != null && !ops.isEmpty()) { + for (int i = 0; i < ops.size(); i++) { + Object o = ops.get(i); + if (o instanceof Map) { + TriggerEvent.Op op = TriggerEvent.Op.fromMap((Map)o); + if (op != null) { + ops.set(i, op); + } + } + } + } + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java index fd587de256c..057d7922494 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.solr.client.solrj.cloud.DistributedQueue; import org.apache.solr.client.solrj.cloud.SolrCloudManager; -import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType; import org.apache.solr.cloud.Stats; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.Utils; @@ -108,12 +107,7 @@ public class TriggerEventQueue { } private TriggerEvent fromMap(Map map) { - String id = (String)map.get("id"); - String source = (String)map.get("source"); - long eventTime = ((Number)map.get("eventTime")).longValue(); - TriggerEventType eventType = TriggerEventType.valueOf((String)map.get("eventType")); - Map properties = (Map)map.get("properties"); - TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties); + TriggerEvent res = TriggerEvent.fromMap(map); res.getProperties().put(DEQUEUE_TIME, timeSource.getTimeNs()); return res; } diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/CapturedEvent.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/CapturedEvent.java index e08d37bfeda..462e94837df 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/CapturedEvent.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/CapturedEvent.java @@ -39,6 +39,8 @@ public class CapturedEvent { TriggerEvent event, String message) { if (context != null) { context.toMap(this.context); + TriggerEvent.fixOps("properties." + TriggerEvent.REQUESTED_OPS, this.context); + TriggerEvent.fixOps("properties." + TriggerEvent.UNSUPPORTED_OPS, this.context); } this.config = config; this.stage = stage; diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java index f960f0fcd54..faabda18eed 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java @@ -93,7 +93,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { configureCluster(2) .addConfig("conf", configset("cloud-minimal")) .configure(); - if (random().nextBoolean()) { + if (random().nextBoolean() || true) { cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager(); solrClient = cluster.getSolrClient(); loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader(); @@ -268,7 +268,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { String setListenerCommand = "{" + "'set-listener' : " + "{" + - "'name' : 'capturing'," + + "'name' : 'capturing2'," + "'trigger' : 'index_size_trigger2'," + "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," + "'beforeAction' : ['compute_plan','execute_plan']," + @@ -316,8 +316,8 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { assertTrue("did not finish processing in time", await); CloudTestUtils.waitForState(cloudManager, collectionName, 20, TimeUnit.SECONDS, CloudTestUtils.clusterShape(6, 2, true, true)); assertEquals(1, listenerEvents.size()); - List events = listenerEvents.get("capturing"); - assertNotNull("'capturing' events not found", events); + List events = listenerEvents.get("capturing2"); + assertNotNull("'capturing2' events not found", events); assertEquals("events: " + events, 6, events.size()); assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage); assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage); @@ -386,7 +386,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { String setListenerCommand = "{" + "'set-listener' : " + "{" + - "'name' : 'capturing'," + + "'name' : 'capturing3'," + "'trigger' : 'index_size_trigger3'," + "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," + "'beforeAction' : ['compute_plan','execute_plan']," + @@ -432,8 +432,8 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS); assertTrue("did not finish processing in time", await); assertEquals(1, listenerEvents.size()); - List events = listenerEvents.get("capturing"); - assertNotNull("'capturing' events not found", events); + List events = listenerEvents.get("capturing3"); + assertNotNull("'capturing3' events not found", events); assertEquals("events: " + events, 6, events.size()); assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage); assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage); @@ -531,7 +531,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { String setListenerCommand = "{" + "'set-listener' : " + "{" + - "'name' : 'capturing'," + + "'name' : 'capturing4'," + "'trigger' : 'index_size_trigger4'," + "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," + "'beforeAction' : ['compute_plan','execute_plan']," + @@ -571,8 +571,8 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS); assertTrue("did not finish processing in time", await); assertEquals(1, listenerEvents.size()); - List events = listenerEvents.get("capturing"); - assertNotNull("'capturing' events not found", events); + List events = listenerEvents.get("capturing4"); + assertNotNull("'capturing4' events not found", events); assertEquals("events: " + events, 6, events.size()); assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage); assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage); @@ -651,8 +651,8 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase { await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS); assertTrue("did not finish processing in time", await); assertEquals(1, listenerEvents.size()); - events = listenerEvents.get("capturing"); - assertNotNull("'capturing' events not found", events); + events = listenerEvents.get("capturing4"); + assertNotNull("'capturing4' events not found", events); assertEquals("events: " + events, 6, events.size()); assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage); assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage); diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerEventQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerEventQueueTest.java new file mode 100644 index 00000000000..6e83c6bb778 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerEventQueueTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling; + +import java.util.Collection; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.cloud.DistribStateManager; +import org.apache.solr.client.solrj.cloud.DistributedQueueFactory; +import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.autoscaling.Suggester; +import org.apache.solr.cloud.autoscaling.sim.GenericDistributedQueueFactory; +import org.apache.solr.cloud.autoscaling.sim.SimDistribStateManager; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.util.Pair; +import org.apache.solr.common.util.TimeSource; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * + */ +public class TriggerEventQueueTest extends SolrTestCaseJ4 { + + SolrCloudManager cloudManager; + + @Before + public void init() throws Exception { + assumeWorkingMockito(); + cloudManager = mock(SolrCloudManager.class); + DistribStateManager stateManager = new SimDistribStateManager(); + when(cloudManager.getDistribStateManager()).thenReturn(stateManager); + DistributedQueueFactory queueFactory = new GenericDistributedQueueFactory(stateManager); + when(cloudManager.getDistributedQueueFactory()).thenReturn(queueFactory); + when(cloudManager.getTimeSource()).thenReturn(TimeSource.NANO_TIME); + } + + @Test + public void testSerialization() throws Exception { + TriggerEventQueue queue = new TriggerEventQueue(cloudManager, "test", null); + Map hotHosts = new HashMap<>(); + hotHosts.put("host1", 1); + hotHosts.put("host2", 1); + TriggerEvent ev = new MetricTrigger.MetricBreachedEvent("testTrigger", "testCollection", "shard1", + CollectionParams.CollectionAction.ADDREPLICA.toLower(), cloudManager.getTimeSource().getTimeNs(), + "foo", hotHosts); + queue.offerEvent(ev); + ev = queue.pollEvent(); + assertNotNull(ev); + Object ops = ev.getProperties().get(TriggerEvent.REQUESTED_OPS); + assertNotNull(ops); + assertTrue(ops.getClass().getName(), ops instanceof List); + List requestedOps = (List)ops; + assertEquals(requestedOps.toString(), 2, requestedOps.size()); + requestedOps.forEach(op -> { + assertTrue(op.getClass().getName(), op instanceof TriggerEvent.Op); + TriggerEvent.Op operation = (TriggerEvent.Op)op; + assertEquals(op.toString(), CollectionParams.CollectionAction.ADDREPLICA, operation.getAction()); + EnumMap hints = ((TriggerEvent.Op) op).getHints(); + assertEquals(hints.toString(), 2, hints.size()); + Object o = hints.get(Suggester.Hint.COLL_SHARD); + assertNotNull(Suggester.Hint.COLL_SHARD.toString(), o); + assertTrue(o.getClass().getName(), o instanceof Collection); + Collection col = (Collection)o; + assertEquals(col.toString(), 1, col.size()); + o = col.iterator().next(); + assertTrue(o.getClass().getName(), o instanceof Pair); + o = hints.get(Suggester.Hint.SRC_NODE); + assertNotNull(Suggester.Hint.SRC_NODE.toString(), o); + assertTrue(o.getClass().getName(), o instanceof Collection); + col = (Collection)o; + assertEquals(col.toString(), 1, col.size()); + o = col.iterator().next(); + assertTrue(o.getClass().getName(), o instanceof String); + }); + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimLargeCluster.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimLargeCluster.java index decb585ba2c..825d3901edd 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimLargeCluster.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimLargeCluster.java @@ -691,7 +691,8 @@ public class TestSimLargeCluster extends SimSolrCloudTestCase { boolean await = triggerFinishedLatch.await(waitForSeconds * 20000 / SPEED, TimeUnit.MILLISECONDS); assertTrue("The trigger did not fire at all", await); // wait for listener to capture the SUCCEEDED stage - cluster.getTimeSource().sleep(2000); + cluster.getTimeSource().sleep(5000); + assertNotNull(listenerEvents.toString(), listenerEvents.get("srt")); assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size()); CapturedEvent ev = listenerEvents.get("srt").get(0); assertEquals(TriggerEventType.SEARCHRATE, ev.event.getEventType()); diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java index c8105e18400..b3896329174 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java @@ -155,6 +155,22 @@ public class Utils { return writer; } + private static class MapWriterJSONWriter extends JSONWriter { + + public MapWriterJSONWriter(CharArr out, int indentSize) { + super(out, indentSize); + } + + @Override + public void handleUnknownClass(Object o) { + if (o instanceof MapWriter) { + Map m = ((MapWriter)o).toMap(new LinkedHashMap<>()); + write(m); + } else { + super.handleUnknownClass(o); + } + } + } public static byte[] toJSON(Object o) { if(o == null) return new byte[0]; @@ -166,7 +182,7 @@ public class Utils { o = ((IteratorWriter)o).toList(new ArrayList<>()); } } - new JSONWriter(out, 2).write(o); // indentation by default + new MapWriterJSONWriter(out, 2).write(o); // indentation by default return toUTF8(out); }