SOLR:10822: Share a Policy.session object between multiple collection admin calls to get the correct computations

This commit is contained in:
Noble Paul 2017-08-15 16:06:35 +09:30
parent 64c480f3e9
commit 91446f7f77
10 changed files with 425 additions and 225 deletions

View File

@ -179,6 +179,8 @@ Other Changes
* SOLR-11195: Require class attribute for shard and cluster metric reporter configuration. (Christine Poerschke)
* SOLR:10822: Share a Policy.session object between multiple collection admin calls to get the correct computations (noble)
================== 7.0.0 ==================
Versions of Major Components

View File

@ -24,9 +24,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@ -46,7 +49,6 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
@ -95,13 +97,14 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
final Long policyVersionBefore = PolicyHelper.REF_VERSION.get();
AtomicLong policyVersionAfter = new AtomicLong(-1);
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
if (!skipCreateReplicaInClusterState) {
if (CreateShardCmd.usePolicyFramework(coll, ocmh)) {
if (node == null) {
if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
node = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
ocmh.zkStateReader,
node = Assign.identifyNodes(ocmh,
clusterState,
Collections.emptyList(),
collection,
@ -111,6 +114,9 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
replicaType == Replica.Type.TLOG ? 0 : 1,
replicaType == Replica.Type.PULL ? 0 : 1
).get(0).node;
if (policyVersionBefore == null && PolicyHelper.REF_VERSION.get() != null) {
policyVersionAfter.set(PolicyHelper.REF_VERSION.get());
}
}
} else {
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
@ -206,6 +212,10 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
Runnable runnable = () -> {
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
if (policyVersionAfter.get() > -1) {
PolicyHelper.REF_VERSION.remove();
ocmh.policySessionRef.decref(policyVersionAfter.get());
}
if (onComplete != null) onComplete.run();
};

View File

@ -28,7 +28,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
@ -236,8 +235,7 @@ public class Assign {
return nodeList;
}
public static List<ReplicaPosition> identifyNodes(Supplier<CoreContainer> coreContainer,
ZkStateReader zkStateReader,
public static List<ReplicaPosition> identifyNodes(OverseerCollectionMessageHandler ocmh,
ClusterState clusterState,
List<String> nodeList,
String collectionName,
@ -248,7 +246,7 @@ public class Assign {
int numPullReplicas) throws KeeperException, InterruptedException {
List<Map> rulesMap = (List) message.get("rule");
String policyName = message.getStr(POLICY);
AutoScalingConfig autoScalingConfig = zkStateReader.getAutoScalingConfig();
AutoScalingConfig autoScalingConfig = ocmh.zkStateReader.getAutoScalingConfig();
if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
log.debug("Identify nodes using default");
@ -275,8 +273,16 @@ public class Assign {
if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
if (message.getStr(CREATE_NODE_SET) == null)
nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
synchronized (ocmh) {
PolicyHelper.SESSION_REF.set(ocmh.policySessionRef);
try {
return getPositionsUsingPolicy(collectionName,
shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, zkStateReader, nodeList);
shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.zkStateReader, nodeList);
} finally {
PolicyHelper.SESSION_REF.remove();
}
}
} else {
log.debug("Identify nodes using rules framework");
List<Rule> rules = new ArrayList<>();
@ -289,7 +295,7 @@ public class Assign {
(List<Map>) message.get(SNITCH),
new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
nodeList,
coreContainer.get(),
ocmh.overseer.getZkController().getCoreContainer(),
clusterState);
Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings();

View File

@ -30,6 +30,7 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
@ -196,8 +197,7 @@ public class CreateCollectionCmd implements Cmd {
}
}
replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
ocmh.zkStateReader
replicaPositions = Assign.identifyNodes(ocmh
, clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
}
@ -331,6 +331,8 @@ public class CreateCollectionCmd implements Cmd {
throw ex;
} catch (Exception ex) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
} finally {
PolicyHelper.clearFlagAndDecref(ocmh.policySessionRef);
}
}
String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {

View File

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@ -87,10 +88,11 @@ public class CreateShardCmd implements Cmd {
ZkStateReader zkStateReader = ocmh.zkStateReader;
boolean usePolicyFramework = usePolicyFramework(collection,ocmh);
List<ReplicaPosition> positions = null;
CountDownLatch countDownLatch;
try {
if (usePolicyFramework) {
if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
positions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
zkStateReader,
positions = Assign.identifyNodes(ocmh,
clusterState,
Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM),
collectionName,
@ -109,7 +111,7 @@ public class CreateShardCmd implements Cmd {
Replica.Type.PULL, numPullReplicas
).entrySet()) {
for (int j = 0; j < e.getValue(); j++) {
positions.add(new ReplicaPosition(sliceName, j+1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
i++;
}
}
@ -119,7 +121,7 @@ public class CreateShardCmd implements Cmd {
ocmh.waitForNewShard(collectionName, sliceName);
String async = message.getStr(ASYNC);
CountDownLatch countDownLatch = new CountDownLatch(totalReplicas);
countDownLatch = new CountDownLatch(totalReplicas);
for (ReplicaPosition position : positions) {
String nodeName = position.node;
String coreName = Assign.buildCoreName(ocmh.zkStateReader.getZkClient(), collection, sliceName, position.type);
@ -158,6 +160,9 @@ public class CreateShardCmd implements Cmd {
}
});
}
} finally {
PolicyHelper.clearFlagAndDecref(ocmh.policySessionRef);
}
log.debug("Waiting for create shard action to complete");
countDownLatch.await(5, TimeUnit.MINUTES);

View File

@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@ -973,6 +974,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
);
}
public final PolicyHelper.SessionRef policySessionRef = new PolicyHelper.SessionRef();
@Override
public void close() throws IOException {
if (tpe != null) {

View File

@ -32,7 +32,9 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.SolrException;
@ -214,12 +216,12 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
List<String> sliceNames = new ArrayList<>();
restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
ocmh.zkStateReader, clusterState,
try {
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
ocmh, clusterState,
nodeList, restoreCollectionName,
message, sliceNames,
numNrtReplicas, numTlogReplicas, numPullReplicas);
//Create one replica per shard and copy backed up data to it
for (Slice slice : restoreCollection.getSlices()) {
log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
@ -233,7 +235,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
} else if (numTlogReplicas >= 1) {
propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
}
@ -343,6 +345,9 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
}
log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
} finally {
PolicyHelper.clearFlagAndDecref(ocmh.policySessionRef);
}
}
private int getInt(ZkNodeProps message, String propertyName, Integer default1, int default2) {

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.overseer.OverseerAction;
@ -381,8 +382,8 @@ public class SplitShardCmd implements Cmd {
// TODO: change this to handle sharding a slice into > 2 sub-shards.
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(() -> ocmh.overseer.getZkController().getCoreContainer(),
ocmh.zkStateReader, clusterState,
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh,
clusterState,
new ArrayList<>(clusterState.getLiveNodes()),
collectionName,
new ZkNodeProps(collection.getProperties()),
@ -504,6 +505,8 @@ public class SplitShardCmd implements Cmd {
} catch (Exception e) {
log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
} finally {
PolicyHelper.clearFlagAndDecref(ocmh.policySessionRef);
}
}
}

View File

@ -21,22 +21,24 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester.Hint;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.Utils;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
public class PolicyHelper {
private static ThreadLocal<Map<String, String>> policyMapping = new ThreadLocal<>();
public static List<ReplicaPosition> getReplicaLocations(String collName, AutoScalingConfig autoScalingConfig,
ClusterDataProvider cdp,
Map<String, String> optionalPolicyMapping,
@ -46,7 +48,6 @@ public class PolicyHelper {
int pullReplicas,
List<String> nodesList) {
List<ReplicaPosition> positions = new ArrayList<>();
if (optionalPolicyMapping != null) {
final ClusterDataProvider delegate = cdp;
cdp = new ClusterDataProvider() {
@Override
@ -66,14 +67,19 @@ public class PolicyHelper {
@Override
public String getPolicyNameByCollection(String coll) {
return optionalPolicyMapping.containsKey(coll) ?
return policyMapping.get() != null && policyMapping.get().containsKey(coll) ?
optionalPolicyMapping.get(coll) :
delegate.getPolicyNameByCollection(coll);
}
};
}
Policy.Session session = autoScalingConfig.getPolicy().createSession(cdp);
policyMapping.set(optionalPolicyMapping);
Policy.Session session = null;
try {
session = SESSION_REF.get() != null ?
SESSION_REF.get().initOrGet(cdp, autoScalingConfig.getPolicy()) :
autoScalingConfig.getPolicy().createSession(cdp);
Map<Replica.Type, Integer> typeVsCount = new EnumMap<>(Replica.Type.class);
typeVsCount.put(Replica.Type.NRT, nrtReplicas);
typeVsCount.put(Replica.Type.TLOG, tlogReplicas);
@ -93,14 +99,95 @@ public class PolicyHelper {
}
SolrRequest op = suggester.getOperation();
if (op == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules " + Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true)));
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules " +
Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true)));
}
session = suggester.getSession();
positions.add(new ReplicaPosition(shardName, ++idx, e.getKey(), op.getParams().get(NODE)));
}
}
}
} finally {
if (session != null && SESSION_REF.get() != null) SESSION_REF.get().updateSession(session);
policyMapping.remove();
}
return positions;
}
public static final int SESSION_EXPIRY = 180;//3 seconds
public static ThreadLocal<Long> REF_VERSION = new ThreadLocal<>();
public static class SessionRef {
private final AtomicLong myVersion = new AtomicLong(0);
AtomicInteger refCount = new AtomicInteger();
private Policy.Session session;
long lastUsedTime;
public SessionRef() {
}
public long getRefVersion(){
return myVersion.get();
}
public void decref(long version) {
synchronized (SessionRef.class) {
if (session == null) return;
if(myVersion.get() != version) return;
if (refCount.decrementAndGet() <= 0) {
session = null;
lastUsedTime = 0;
}
}
}
public int getRefCount() {
return refCount.get();
}
public Policy.Session get() {
synchronized (SessionRef.class) {
if (session == null) return null;
if (TimeUnit.SECONDS.convert(System.nanoTime() - lastUsedTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
session = null;
return null;
} else {
REF_VERSION.set(myVersion.get());
refCount.incrementAndGet();
return session;
}
}
}
public Policy.Session initOrGet(ClusterDataProvider cdp, Policy policy) {
synchronized (SessionRef.class) {
Policy.Session session = get();
if (session != null) return session;
this.session = policy.createSession(cdp);
myVersion.incrementAndGet();
lastUsedTime = System.nanoTime();
REF_VERSION.set(myVersion.get());
refCount.set(1);
return this.session;
}
}
private void updateSession(Policy.Session session) {
this.session = session;
lastUsedTime = System.nanoTime();
}
}
public static void clearFlagAndDecref(SessionRef policySessionRef) {
Long refVersion = REF_VERSION.get();
if (refVersion != null) policySessionRef.decref(refVersion);
REF_VERSION.remove();
}
public static ThreadLocal<SessionRef> SESSION_REF = new ThreadLocal<>();
}

View File

@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.google.common.collect.ImmutableList;
import org.apache.solr.SolrTestCaseJ4;
@ -40,6 +41,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ValidatingJsonMap;
import org.junit.Test;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
@ -832,7 +834,82 @@ public class TestPolicy extends SolrTestCaseJ4 {
.getOperation();
assertNotNull(opReq);
assertEquals("node5", opReq.getParams().get("targetNode"));
}
@Test
public void testSessionCaching() {
PolicyHelper.SessionRef ref1 = new PolicyHelper.SessionRef();
String autoScalingjson = " '{cluster-policy':[" +
" { 'cores':'<10', 'node':'#ANY'}," +
" { 'replica':'<2', 'shard':'#EACH', 'node':'#ANY'}," +
" { 'nodeRole':'overseer','replica':0}]," +
" 'cluster-preferences':[{'minimize':'cores'}]}";
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
PolicyHelper.SESSION_REF.set(ref1);
String nodeValues = " {" +
" 'node4':{" +
" 'node':'10.0.0.4:8987_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
" 'node3':{" +
" 'node':'10.0.0.4:8989_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
" 'node2':{" +
" 'node':'10.0.0.4:7574_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
"}";
ClusterDataProvider provider = getClusterDataProvider((Map<String, Map>) Utils.fromJSONString(nodeValues), clusterState);
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
AutoScalingConfig config = new AutoScalingConfig(policies);
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations("c", config, provider, null,
Arrays.asList("s1", "s2"), 1, 0, 0,
null);
long sessionRefVersion = PolicyHelper.REF_VERSION.get();
PolicyHelper.SessionRef ref1Copy = PolicyHelper.SESSION_REF.get();
PolicyHelper.SESSION_REF.remove();
Policy.Session session = ref1Copy.get();
assertNotNull(session);
assertEquals(ref1, ref1Copy);
assertTrue(session.getPolicy() == config.getPolicy());
ref1Copy.decref(sessionRefVersion);
PolicyHelper.SESSION_REF.set(ref1);
AutoScalingConfig config2 = new AutoScalingConfig(policies);
locations = PolicyHelper.getReplicaLocations("c2", config2, provider, null, Arrays.asList("s1", "s2"), 1, 0, 0,
null);
sessionRefVersion = PolicyHelper.REF_VERSION.get();
ref1Copy = PolicyHelper.SESSION_REF.get();
PolicyHelper.SESSION_REF.remove();
session = ref1Copy.get();
ref1Copy.decref(sessionRefVersion);
assertEquals(ref1, ref1Copy);
assertFalse(session.getPolicy() == config2.getPolicy());
assertTrue(session.getPolicy() == config.getPolicy());
assertEquals(2, ref1Copy.getRefCount());
ref1.decref(sessionRefVersion);//decref 1
ref1.decref(sessionRefVersion);//decref 2
PolicyHelper.SESSION_REF.set(ref1);
locations = PolicyHelper.getReplicaLocations("c3", config2, provider, null, Arrays.asList("s1", "s2"), 1, 0, 0,
null);
sessionRefVersion = PolicyHelper.REF_VERSION.get();
ref1Copy = PolicyHelper.SESSION_REF.get();
PolicyHelper.SESSION_REF.remove();
session = ref1Copy.get();
ref1Copy.decref(sessionRefVersion);
assertTrue(session.getPolicy() == config2.getPolicy());
}