mirror of https://github.com/apache/lucene.git
SOLR-14347: fix cached session update to not depend on Zookeeper state (#1542)
SOLR-14347: fix cached session update to not depend on Zookeeper state
This commit is contained in:
parent
37a83675a7
commit
d4f7c90bab
|
@ -382,7 +382,7 @@ public class Policy implements MapWriter {
|
|||
}
|
||||
|
||||
public Session createSession(SolrCloudManager cloudManager) {
|
||||
return new Session(cloudManager, this, null);
|
||||
return createSession(cloudManager, null);
|
||||
}
|
||||
|
||||
public Session createSession(SolrCloudManager cloudManager, Transaction tx) {
|
||||
|
@ -550,14 +550,18 @@ public class Policy implements MapWriter {
|
|||
final SolrCloudManager cloudManager;
|
||||
final List<Row> matrix;
|
||||
final NodeStateProvider nodeStateProvider;
|
||||
Set<String> collections = new HashSet<>();
|
||||
final Set<String> collections;
|
||||
final Policy policy;
|
||||
List<Clause> expandedClauses;
|
||||
List<Violation> violations = new ArrayList<>();
|
||||
Transaction transaction;
|
||||
|
||||
|
||||
/**
|
||||
* This constructor creates a Session from the current Zookeeper collection, replica and node states.
|
||||
*/
|
||||
Session(SolrCloudManager cloudManager, Policy policy, Transaction transaction) {
|
||||
collections = new HashSet<>();
|
||||
this.transaction = transaction;
|
||||
this.policy = policy;
|
||||
ClusterState state = null;
|
||||
|
@ -605,37 +609,106 @@ public class Policy implements MapWriter {
|
|||
applyRules();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new Session and updates the Rows in the internal matrix to reference this session.
|
||||
*/
|
||||
private Session(List<String> nodes, SolrCloudManager cloudManager,
|
||||
List<Row> matrix, List<Clause> expandedClauses,
|
||||
List<Row> matrix, Set<String> collections, List<Clause> expandedClauses,
|
||||
NodeStateProvider nodeStateProvider, Policy policy, Transaction transaction) {
|
||||
this.transaction = transaction;
|
||||
this.policy = policy;
|
||||
this.nodes = nodes;
|
||||
this.cloudManager = cloudManager;
|
||||
this.collections = collections;
|
||||
this.matrix = matrix;
|
||||
this.expandedClauses = expandedClauses;
|
||||
this.nodeStateProvider = nodeStateProvider;
|
||||
for (Row row : matrix) row.session = this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a session (this one), creates a new one for placement simulations that retains all the relevant information,
|
||||
* whether or not that info already made it to Zookeeper.
|
||||
*/
|
||||
public Session cloneToNewSession(SolrCloudManager cloudManager) {
|
||||
NodeStateProvider nodeStateProvider = cloudManager.getNodeStateProvider();
|
||||
ClusterStateProvider clusterStateProvider = cloudManager.getClusterStateProvider();
|
||||
|
||||
List<String> nodes = new ArrayList<>(clusterStateProvider.getLiveNodes());
|
||||
|
||||
// Copy all collections from old session, even those not yet in ZK state
|
||||
Set<String> collections = new HashSet<>(this.collections);
|
||||
|
||||
// (shallow) copy the expanded clauses
|
||||
List<Clause> expandedClauses = new ArrayList<>(this.expandedClauses);
|
||||
|
||||
List<Row> matrix = new ArrayList<>(nodes.size());
|
||||
Map<String, Row> copyNodes = new HashMap<>();
|
||||
for (Row oldRow: this.matrix) {
|
||||
copyNodes.put(oldRow.node, oldRow.copy());
|
||||
}
|
||||
for (String node : nodes) {
|
||||
// Do we have a row for that node in this session? If yes, reuse without trying to fetch from cluster state (latest changes might not be there)
|
||||
Row newRow = copyNodes.get(node);
|
||||
if (newRow == null) {
|
||||
// Dealing with a node that doesn't exist in this Session. Need to create related data from scratch.
|
||||
// We pass null for the Session in purpose. The current (this) session in not the correct one for this Row.
|
||||
// The correct session will be set when we build the new Session instance at the end of this method.
|
||||
newRow = new Row(node, this.policy.getParams(), this.policy.getPerReplicaAttributes(), null, nodeStateProvider, cloudManager);
|
||||
// Get info for collections on that node
|
||||
Set<String> collectionsOnNewNode = nodeStateProvider.getReplicaInfo(node, Collections.emptyList()).keySet();
|
||||
collections.addAll(collectionsOnNewNode);
|
||||
|
||||
// Adjust policies to take into account new collections
|
||||
for (String collection : collectionsOnNewNode) {
|
||||
// We pass this.policy but it is not modified so will not impact this session being cloned
|
||||
addClausesForCollection(this.policy, expandedClauses, clusterStateProvider, collection);
|
||||
}
|
||||
}
|
||||
matrix.add(newRow);
|
||||
}
|
||||
|
||||
if (nodes.size() > 0) {
|
||||
//if any collection has 'withCollection' irrespective of the node, the NodeStateProvider returns a map value
|
||||
Map<String, Object> vals = nodeStateProvider.getNodeValues(nodes.get(0), Collections.singleton("withCollection"));
|
||||
if (!vals.isEmpty() && vals.get("withCollection") != null) {
|
||||
Map<String, String> withCollMap = (Map<String, String>) vals.get("withCollection");
|
||||
if (!withCollMap.isEmpty()) {
|
||||
Clause withCollClause = new Clause((Map<String,Object>)Utils.fromJSONString("{withCollection:'*' , node: '#ANY'}") ,
|
||||
new Condition(NODE.tagName, "#ANY", Operand.EQUAL, null, null),
|
||||
new Condition(WITH_COLLECTION.tagName,"*" , Operand.EQUAL, null, null), true, null, false
|
||||
);
|
||||
expandedClauses.add(withCollClause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Collections.sort(expandedClauses);
|
||||
|
||||
Session newSession = new Session(nodes, cloudManager, matrix, collections, expandedClauses,
|
||||
nodeStateProvider, this.policy, this.transaction);
|
||||
newSession.applyRules();
|
||||
|
||||
return newSession;
|
||||
}
|
||||
|
||||
void addClausesForCollection(ClusterStateProvider stateProvider, String collection) {
|
||||
addClausesForCollection(policy, expandedClauses, stateProvider, collection);
|
||||
}
|
||||
|
||||
public static void addClausesForCollection(Policy policy, List<Clause> clauses, ClusterStateProvider stateProvider, String c) {
|
||||
String p = stateProvider.getPolicyNameByCollection(c);
|
||||
public static void addClausesForCollection(Policy policy, List<Clause> clauses, ClusterStateProvider stateProvider, String collectionName) {
|
||||
String p = stateProvider.getPolicyNameByCollection(collectionName);
|
||||
if (p != null) {
|
||||
List<Clause> perCollPolicy = policy.getPolicies().get(p);
|
||||
if (perCollPolicy == null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
clauses.addAll(mergePolicies(c, policy.getPolicies().getOrDefault(p, emptyList()), policy.getClusterPolicy()));
|
||||
clauses.addAll(mergePolicies(collectionName, policy.getPolicies().getOrDefault(p, emptyList()), policy.getClusterPolicy()));
|
||||
}
|
||||
|
||||
Session copy() {
|
||||
return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, nodeStateProvider, policy, transaction);
|
||||
return new Session(nodes, cloudManager, getMatrixCopy(), new HashSet<>(), expandedClauses, nodeStateProvider, policy, transaction);
|
||||
}
|
||||
|
||||
public Row getNode(String node) {
|
||||
|
@ -645,7 +718,7 @@ public class Policy implements MapWriter {
|
|||
|
||||
List<Row> getMatrixCopy() {
|
||||
return matrix.stream()
|
||||
.map(row -> row.copy(this))
|
||||
.map(row -> row.copy())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
|
|
@ -121,17 +121,21 @@ public class PolicyHelper {
|
|||
|
||||
policyMapping.set(optionalPolicyMapping);
|
||||
SessionWrapper sessionWrapper = null;
|
||||
Policy.Session origSession = null;
|
||||
|
||||
try {
|
||||
try {
|
||||
SESSION_WRAPPPER_REF.set(sessionWrapper = getSession(delegatingManager));
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to get autoscaling policy session", e);
|
||||
|
||||
}
|
||||
origSession = sessionWrapper.session;
|
||||
|
||||
Policy.Session origSession = sessionWrapper.session;
|
||||
// new session needs to be created to avoid side-effects from per-collection policies
|
||||
Policy.Session session = new Policy.Session(delegatingManager, origSession.policy, origSession.transaction);
|
||||
// TODO: refactor so cluster state cache is separate from storage of policies to avoid per cluster vs per collection interactions
|
||||
// Need a Session that has all previous history of the original session, NOT filtered by what's present or not in Zookeeper
|
||||
// (as does constructor Session(SolrCloudManager, Policy, Transaction)).
|
||||
Policy.Session newSession = origSession.cloneToNewSession(delegatingManager);
|
||||
|
||||
Map<String, Double> diskSpaceReqd = new HashMap<>();
|
||||
try {
|
||||
DocCollection coll = cloudManager.getClusterStateProvider().getCollection(collName);
|
||||
|
@ -165,7 +169,7 @@ public class PolicyHelper {
|
|||
int idx = 0;
|
||||
for (Map.Entry<Replica.Type, Integer> e : typeVsCount.entrySet()) {
|
||||
for (int i = 0; i < e.getValue(); i++) {
|
||||
Suggester suggester = session.getSuggester(ADDREPLICA)
|
||||
Suggester suggester = newSession.getSuggester(ADDREPLICA)
|
||||
.hint(Hint.REPLICATYPE, e.getKey())
|
||||
.hint(Hint.COLL_SHARD, new Pair<>(collName, shardName));
|
||||
if (nodesList != null) {
|
||||
|
@ -184,18 +188,23 @@ public class PolicyHelper {
|
|||
, handleExp(log, "", () -> Utils.writeJson(getDiagnostics(sessionCopy), new StringWriter(), true).toString())); // logOk
|
||||
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, " No node can satisfy the rules " +
|
||||
Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true) + " More details from logs in node : "
|
||||
Utils.toJSONString(Utils.getDeepCopy(newSession.expandedClauses, 4, true) + " More details from logs in node : "
|
||||
+ Utils.getMDCNode() + ", errorId : " + errorId));
|
||||
}
|
||||
session = suggester.getSession();
|
||||
newSession = suggester.getSession();
|
||||
positions.add(new ReplicaPosition(shardName, ++idx, e.getKey(), op.getParams().get(NODE)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We're happy with the updated session based on the original one, so let's update what the wrapper would hand
|
||||
// to the next computation that wants a session.
|
||||
sessionWrapper.update(newSession);
|
||||
} finally {
|
||||
policyMapping.remove();
|
||||
// We mark the wrapper (and its session) as being available to others.
|
||||
if (sessionWrapper != null) {
|
||||
sessionWrapper.returnSession(origSession);
|
||||
sessionWrapper.returnSession();
|
||||
}
|
||||
}
|
||||
return positions;
|
||||
|
@ -600,16 +609,21 @@ public class PolicyHelper {
|
|||
*/
|
||||
public void returnSession(Policy.Session session) {
|
||||
this.update(session);
|
||||
this.returnSession();
|
||||
}
|
||||
|
||||
/**
|
||||
* return this for later use without updating the internal Session for cases where it's easier to update separately
|
||||
*/
|
||||
public void returnSession() {
|
||||
refCount.incrementAndGet();
|
||||
ref.returnSession(this);
|
||||
|
||||
}
|
||||
|
||||
//all ops are executed now it can be destroyed
|
||||
public void release() {
|
||||
refCount.decrementAndGet();
|
||||
ref.release(this);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,12 +27,15 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
@ -61,14 +64,27 @@ public class Row implements MapWriter {
|
|||
Map perCollCache;
|
||||
|
||||
public Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session) {
|
||||
this(node, params, perReplicaAttributes, session, session.nodeStateProvider, session.cloudManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor that allows explicitly passing a {@link NodeStateProvider} and a {@link SolrCloudManager} in order not to
|
||||
* use those obtained through the passed <code>session</code>.
|
||||
* <p>Note the resulting row has a {@link Policy.Session} that may not be consistent with the rest of the Row's state. When rows are copied
|
||||
* as part of a {@link Policy.Session} copy, the copied rows' sessions are eventually updated in
|
||||
* {@link org.apache.solr.client.solrj.cloud.autoscaling.Policy.Session#Session(List, SolrCloudManager, List, Set, List, NodeStateProvider, Policy, Policy.Transaction)}
|
||||
* once the new {@link Policy.Session} instance is available.</p>
|
||||
*/
|
||||
Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session,
|
||||
NodeStateProvider nsp, SolrCloudManager cloudManager) {
|
||||
this.session = session;
|
||||
collectionVsShardVsReplicas = session.nodeStateProvider.getReplicaInfo(node, perReplicaAttributes);
|
||||
collectionVsShardVsReplicas = nsp.getReplicaInfo(node, perReplicaAttributes);
|
||||
if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
|
||||
this.node = node;
|
||||
cells = new Cell[params.size()];
|
||||
isLive = session.cloudManager.getClusterStateProvider().getLiveNodes().contains(node);
|
||||
isLive = cloudManager.getClusterStateProvider().getLiveNodes().contains(node);
|
||||
List<String> paramNames = params.stream().map(Pair::first).collect(Collectors.toList());
|
||||
Map<String, Object> vals = isLive ? session.nodeStateProvider.getNodeValues(node, paramNames) : Collections.emptyMap();
|
||||
Map<String, Object> vals = isLive ? nsp.getNodeValues(node, paramNames) : Collections.emptyMap();
|
||||
for (int i = 0; i < params.size(); i++) {
|
||||
Pair<String, Variable.Type> pair = params.get(i);
|
||||
cells[i] = new Cell(i, pair.first(), Clause.validate(pair.first(), vals.get(pair.first()), false), null, pair.second(), this);
|
||||
|
@ -80,7 +96,6 @@ public class Row implements MapWriter {
|
|||
isAlreadyCopied = true;
|
||||
}
|
||||
|
||||
|
||||
public static final Map<String, CacheEntry> cacheStats = new HashMap<>();
|
||||
|
||||
static class CacheEntry implements MapWriter {
|
||||
|
@ -175,7 +190,7 @@ public class Row implements MapWriter {
|
|||
ew.put("attributes", Arrays.asList(cells));
|
||||
}
|
||||
|
||||
Row copy(Policy.Session session) {
|
||||
Row copy() {
|
||||
return new Row(node, cells, anyValueMissing, collectionVsShardVsReplicas, isLive, session, this.globalCache, this.perCollCache);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue