mirror of https://github.com/apache/lucene.git
SOLR-12709: Several fixes to the simulator and its .system collection auto-creation.
This commit is contained in:
parent
4ca885ac9e
commit
2201b65266
|
@ -97,14 +97,14 @@ public class CloudTestUtils {
|
|||
// due to the way we manage collections in SimClusterStateProvider a null here
|
||||
// can mean that a collection is still being created but has no replicas
|
||||
if (coll == null) { // does not yet exist?
|
||||
timeout.sleep(50);
|
||||
timeout.sleep(100);
|
||||
continue;
|
||||
}
|
||||
if (predicate.matches(state.getLiveNodes(), coll)) {
|
||||
log.trace("-- predicate matched with state {}", state);
|
||||
return timeout.timeElapsed(TimeUnit.MILLISECONDS);
|
||||
}
|
||||
timeout.sleep(50);
|
||||
timeout.sleep(100);
|
||||
if (timeout.timeLeft(TimeUnit.MILLISECONDS) < timeWarn) {
|
||||
log.trace("-- still not matching predicate: {}", state);
|
||||
}
|
||||
|
|
|
@ -678,8 +678,16 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
|
|||
if (m.get("success") != null) {
|
||||
replicas.incrementAndGet();
|
||||
} else if (m.get("status") != null) {
|
||||
NamedList<Object> status = (NamedList<Object>)m.get("status");
|
||||
if ("completed".equals(status.get("state"))) {
|
||||
Object status = m.get("status");
|
||||
String state;
|
||||
if (status instanceof Map) {
|
||||
state = (String)((Map)status).get("state");
|
||||
} else if (status instanceof NamedList) {
|
||||
state = (String)((NamedList)status).get("state");
|
||||
} else {
|
||||
throw new IllegalArgumentException("unsupported status format: " + status.getClass().getName() + ", " + status);
|
||||
}
|
||||
if ("completed".equals(state)) {
|
||||
nodes.incrementAndGet();
|
||||
} else {
|
||||
fail("unexpected DELETENODE status: " + m);
|
||||
|
|
|
@ -542,6 +542,7 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
triggerThread.interrupt();
|
||||
IOUtils.closeQuietly(triggerThread);
|
||||
if (killNodeId != null) {
|
||||
log.info(" = killing node " + killNodeId);
|
||||
simRemoveNode(killNodeId, false);
|
||||
}
|
||||
objectCache.clear();
|
||||
|
@ -746,7 +747,7 @@ public class SimCloudManager implements SolrCloudManager {
|
|||
if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
|
||||
continue;
|
||||
}
|
||||
eventCounts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
|
||||
eventCounts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
|
||||
.incrementAndGet();
|
||||
}
|
||||
|
|
|
@ -155,7 +155,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
private Map<String, Object> lastSavedProperties = null;
|
||||
|
||||
private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
|
||||
private AtomicBoolean saveClusterState = new AtomicBoolean();
|
||||
|
||||
private Random bulkUpdateRandom = new Random(0);
|
||||
|
||||
|
@ -619,14 +618,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
* Save clusterstate.json to {@link DistribStateManager}.
|
||||
* @return saved state
|
||||
*/
|
||||
private synchronized ClusterState saveClusterState(ClusterState state) throws IOException {
|
||||
private ClusterState saveClusterState(ClusterState state) throws IOException {
|
||||
ensureNotClosed();
|
||||
byte[] data = Utils.toJSON(state);
|
||||
try {
|
||||
VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_STATE);
|
||||
int version = oldData != null ? oldData.getVersion() : -1;
|
||||
Assert.assertEquals(clusterStateVersion, version + 1);
|
||||
stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
|
||||
log.debug("** saved cluster state version " + version);
|
||||
log.debug("** saved cluster state version " + (version + 1));
|
||||
clusterStateVersion++;
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
|
@ -754,7 +754,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
synchronized (ri) {
|
||||
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
|
||||
}
|
||||
log.debug("-- elected new leader for {} / {}: {}", collection, s.getName(), ri);
|
||||
log.debug("-- elected new leader for {} / {} (currentVersion={}): {}", collection,
|
||||
s.getName(), clusterStateVersion, ri);
|
||||
stateChanged.set(true);
|
||||
}
|
||||
} else {
|
||||
|
@ -762,7 +763,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
if (stateChanged.get() || saveState) {
|
||||
collectionsStatesRef.set(null);
|
||||
saveClusterState.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -778,6 +778,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
boolean waitForFinalState = props.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
|
||||
final String collectionName = props.getStr(NAME);
|
||||
log.debug("-- simCreateCollection {}, currentVersion={}", collectionName, clusterStateVersion);
|
||||
|
||||
String router = props.getStr("router.name", DocRouter.DEFAULT_NAME);
|
||||
String policy = props.getStr(Policy.POLICY);
|
||||
|
@ -808,12 +809,47 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
final String withCollectionShard = wcShard;
|
||||
|
||||
ZkWriteCommand cmd = new ClusterStateMutator(cloudManager).createCollection(clusterState, props);
|
||||
lock.lockInterruptibly();
|
||||
ZkWriteCommand cmd = ZkWriteCommand.noop();
|
||||
try {
|
||||
cmd = new ClusterStateMutator(cloudManager).createCollection(clusterState, props);
|
||||
if (cmd.noop) {
|
||||
log.warn("Collection {} already exists. exit", collectionName);
|
||||
log.debug("-- collection: {}, clusterState: {}", collectionName, clusterState);
|
||||
results.add("success", "no-op");
|
||||
return;
|
||||
}
|
||||
// add collection props
|
||||
DocCollection coll = cmd.collection;
|
||||
collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).putAll(coll.getProperties());
|
||||
colShardReplicaMap.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
|
||||
// add slice props
|
||||
coll.getSlices().forEach(s -> {
|
||||
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll.getName(), c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(s.getName(), slice -> new ConcurrentHashMap<>());
|
||||
s.getProperties().forEach((k, v) -> {
|
||||
if (k != null && v != null) {
|
||||
sliceProps.put(k, v);
|
||||
}
|
||||
});
|
||||
colShardReplicaMap.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(s.getName(), sh -> new ArrayList<>());
|
||||
});
|
||||
|
||||
// modify the `withCollection` and store this new collection's name with it
|
||||
if (withCollection != null) {
|
||||
ZkNodeProps message = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(),
|
||||
ZkStateReader.COLLECTION_PROP, withCollection,
|
||||
CollectionAdminParams.COLOCATED_WITH, collectionName);
|
||||
cmd = new CollectionMutator(cloudManager).modifyCollection(clusterState,message);
|
||||
}
|
||||
// force recreation of collection states
|
||||
collectionsStatesRef.set(null);
|
||||
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
opDelays.computeIfAbsent(collectionName, c -> new HashMap<>()).putAll(defaultOpDelays);
|
||||
|
||||
opDelay(collectionName, CollectionParams.CollectionAction.CREATE.name());
|
||||
|
@ -883,28 +919,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
// add collection props
|
||||
DocCollection coll = cmd.collection;
|
||||
collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).putAll(coll.getProperties());
|
||||
// add slice props
|
||||
coll.getSlices().forEach(s -> {
|
||||
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll.getName(), c -> new ConcurrentHashMap<>())
|
||||
.computeIfAbsent(s.getName(), slice -> new ConcurrentHashMap<>());
|
||||
s.getProperties().forEach((k, v) -> {
|
||||
if (k != null && v != null) {
|
||||
sliceProps.put(k, v);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// modify the `withCollection` and store this new collection's name with it
|
||||
if (withCollection != null) {
|
||||
ZkNodeProps message = new ZkNodeProps(
|
||||
Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(),
|
||||
ZkStateReader.COLLECTION_PROP, withCollection,
|
||||
CollectionAdminParams.COLOCATED_WITH, collectionName);
|
||||
cmd = new CollectionMutator(cloudManager).modifyCollection(clusterState,message);
|
||||
}
|
||||
|
||||
// force recreation of collection states
|
||||
collectionsStatesRef.set(null);
|
||||
|
@ -918,6 +932,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
results.add("success", "");
|
||||
log.debug("-- finished createCollection {}, currentVersion={}", collectionName, clusterStateVersion);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -963,7 +978,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
});
|
||||
collectionsStatesRef.set(null);
|
||||
saveClusterState.set(true);
|
||||
results.add("success", "");
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception", e);
|
||||
|
@ -977,18 +991,20 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
*/
|
||||
public void simDeleteAllCollections() throws Exception {
|
||||
lock.lockInterruptibly();
|
||||
collectionsStatesRef.set(null);
|
||||
try {
|
||||
nodeReplicaMap.clear();
|
||||
colShardReplicaMap.clear();
|
||||
collProperties.clear();
|
||||
sliceProperties.clear();
|
||||
leaderThrottles.clear();
|
||||
nodeReplicaMap.clear();
|
||||
colShardReplicaMap.clear();
|
||||
cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> {
|
||||
values.put(ImplicitSnitch.CORES, 0);
|
||||
values.put(ImplicitSnitch.DISK, 1000);
|
||||
values.put(ImplicitSnitch.DISK, SimCloudManager.DEFAULT_FREE_DISK);
|
||||
values.put(Variable.Type.TOTALDISK.tagName, SimCloudManager.DEFAULT_TOTAL_DISK);
|
||||
values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
|
||||
values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
|
||||
});
|
||||
collectionsStatesRef.set(null);
|
||||
saveClusterState.set(true);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -1326,20 +1342,21 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
|
||||
public void createSystemCollection() throws IOException {
|
||||
public synchronized void createSystemCollection() throws IOException {
|
||||
try {
|
||||
if (colShardReplicaMap.containsKey(CollectionAdminParams.SYSTEM_COLL)) {
|
||||
return;
|
||||
}
|
||||
String repFactor = String.valueOf(Math.min(3, liveNodes.size()));
|
||||
ZkNodeProps props = new ZkNodeProps(
|
||||
NAME, CollectionAdminParams.SYSTEM_COLL,
|
||||
REPLICATION_FACTOR, "1",
|
||||
REPLICATION_FACTOR, repFactor,
|
||||
OverseerCollectionMessageHandler.NUM_SLICES, "1",
|
||||
CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
|
||||
);
|
||||
simCreateCollection(props, new NamedList());
|
||||
CloudTestUtils.waitForState(cloudManager, CollectionAdminParams.SYSTEM_COLL, 20, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 1, false, true));
|
||||
CloudTestUtils.clusterShape(1, Integer.parseInt(repFactor), false, true));
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
@ -1372,6 +1389,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
if (!colShardReplicaMap.containsKey(collection)) {
|
||||
if (CollectionAdminParams.SYSTEM_COLL.equals(collection)) {
|
||||
// auto-create
|
||||
log.trace("-- auto-create .system when req=" + req);
|
||||
createSystemCollection();
|
||||
} else {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collection + "' doesn't exist");
|
||||
|
@ -2041,25 +2059,22 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
|
||||
@Override
|
||||
public ClusterState getClusterState() throws IOException {
|
||||
ensureNotClosed();
|
||||
Map<String, DocCollection> states = getCollectionStates();
|
||||
ClusterState state = new ClusterState(clusterStateVersion, liveNodes.get(), states);
|
||||
if (saveClusterState.getAndSet(false)) {
|
||||
saveClusterState(state);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
// this method uses a simple cache in collectionsStatesRef. Operations that modify
|
||||
// cluster state should always reset this cache so that the changes become visible
|
||||
private Map<String, DocCollection> getCollectionStates() {
|
||||
private Map<String, DocCollection> getCollectionStates() throws IOException {
|
||||
Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
|
||||
if (collectionStates != null) {
|
||||
return collectionStates;
|
||||
}
|
||||
lock.lock();
|
||||
collectionsStatesRef.set(null);
|
||||
saveClusterState.set(true);
|
||||
log.debug("** creating new collection states");
|
||||
log.debug("** creating new collection states, currentVersion={}", clusterStateVersion);
|
||||
try {
|
||||
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
|
||||
nodeReplicaMap.forEach((n, replicas) -> {
|
||||
|
@ -2101,9 +2116,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
|
||||
Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
|
||||
DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
|
||||
DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion + 1, ZkStateReader.CLUSTER_STATE);
|
||||
DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
|
||||
res.put(coll, dc);
|
||||
});
|
||||
saveClusterState(new ClusterState(clusterStateVersion, liveNodes.get(), res));
|
||||
collectionsStatesRef.set(res);
|
||||
return res;
|
||||
} finally {
|
||||
|
|
|
@ -89,8 +89,6 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
|
|||
// clear any persisted configuration
|
||||
cluster.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), -1);
|
||||
cluster.getDistribStateManager().setData(ZkStateReader.ROLES, Utils.toJSON(new HashMap<>()), -1);
|
||||
cluster.getSimClusterStateProvider().simDeleteAllCollections();
|
||||
cluster.simClearSystemCollection();
|
||||
cluster.getSimNodeStateProvider().simRemoveDeadNodes();
|
||||
cluster.getSimClusterStateProvider().simRemoveDeadNodes();
|
||||
// restore the expected number of nodes
|
||||
|
@ -110,7 +108,9 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
|
|||
removeChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
|
||||
cluster.getSimClusterStateProvider().simResetLeaderThrottles();
|
||||
cluster.simRestartOverseer(null);
|
||||
cluster.getTimeSource().sleep(5000);
|
||||
cluster.getSimClusterStateProvider().simDeleteAllCollections();
|
||||
cluster.simClearSystemCollection();
|
||||
cluster.getTimeSource().sleep(10000);
|
||||
cluster.simResetOpCounts();
|
||||
}
|
||||
|
||||
|
|
|
@ -136,7 +136,7 @@ public class TestSimLargeCluster extends SimSolrCloudTestCase {
|
|||
if (!cluster.getSimClusterStateProvider().simListCollections().contains(CollectionAdminParams.SYSTEM_COLL)) {
|
||||
cluster.getSimClusterStateProvider().createSystemCollection();
|
||||
CloudTestUtils.waitForState(cluster, CollectionAdminParams.SYSTEM_COLL, 120, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 1, false, true));
|
||||
CloudTestUtils.clusterShape(1, 3, false, true));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -76,7 +76,7 @@ import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDU
|
|||
/**
|
||||
* An end-to-end integration test for triggers
|
||||
*/
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
|
||||
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;")
|
||||
public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
|
@ -152,12 +152,11 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
|||
// lets start a node
|
||||
cluster.simAddNode();
|
||||
}
|
||||
cluster.getTimeSource().sleep(10000);
|
||||
// do this in advance if missing
|
||||
if (!cluster.getSimClusterStateProvider().simListCollections().contains(CollectionAdminParams.SYSTEM_COLL)) {
|
||||
cluster.getSimClusterStateProvider().createSystemCollection();
|
||||
CloudTestUtils.waitForState(cluster, CollectionAdminParams.SYSTEM_COLL, 120, TimeUnit.SECONDS,
|
||||
CloudTestUtils.clusterShape(1, 1, false, true));
|
||||
}
|
||||
CloudTestUtils.clusterShape(1, 2, false, true));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -661,6 +660,9 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
|
|||
fail("The TriggerAction should have been created by now");
|
||||
}
|
||||
|
||||
// wait for the trigger to run at least once
|
||||
cluster.getTimeSource().sleep(2 * waitForSeconds * 1000);
|
||||
|
||||
// add node to generate the event
|
||||
String newNode = cluster.simAddNode();
|
||||
boolean await = actionStarted.await(60000 / SPEED, TimeUnit.MILLISECONDS);
|
||||
|
|
Loading…
Reference in New Issue