SOLR-11669: Policy Session lifecycle cleanup

This commit is contained in:
Noble Paul 2017-12-06 15:04:04 +11:00
parent e84cce8ea1
commit 071d9270d5
14 changed files with 468 additions and 152 deletions

View File

@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
@ -101,8 +101,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
final Long policyVersionBefore = PolicyHelper.REF_VERSION.get();
AtomicLong policyVersionAfter = new AtomicLong(-1);
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
if (!skipCreateReplicaInClusterState) {
if (CreateShardCmd.usePolicyFramework(coll, ocmh)) {
@ -118,9 +118,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
replicaType == Replica.Type.TLOG ? 0 : 1,
replicaType == Replica.Type.PULL ? 0 : 1
).get(0).node;
if (policyVersionBefore == null && PolicyHelper.REF_VERSION.get() != null) {
policyVersionAfter.set(PolicyHelper.REF_VERSION.get());
}
sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
}
} else {
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
@ -220,9 +218,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
Runnable runnable = () -> {
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
if (policyVersionAfter.get() > -1) {
PolicyHelper.REF_VERSION.remove();
PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()).decref(policyVersionAfter.get());
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
if (onComplete != null) onComplete.run();
};

View File

@ -293,15 +293,8 @@ public class Assign {
} else {
if (message.getStr(CREATE_NODE_SET) == null)
nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
synchronized (PolicyHelper.class) {
PolicyHelper.SESSION_REF.set(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
try {
return getPositionsUsingPolicy(collectionName,
shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.overseer.getSolrCloudManager(), nodeList);
} finally {
PolicyHelper.SESSION_REF.remove();
}
}
}
}

View File

@ -103,6 +103,7 @@ public class CreateCollectionCmd implements Cmd {
}
ocmh.validateConfigOrThrowSolrException(configName);
PolicyHelper.SessionWrapper sessionWrapper = null;
try {
// look at the replication factor and see if it matches reality
@ -184,6 +185,7 @@ public class CreateCollectionCmd implements Cmd {
}
replicaPositions = Assign.identifyNodes(ocmh
, clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
}
ZkStateReader zkStateReader = ocmh.zkStateReader;
@ -318,7 +320,7 @@ public class CreateCollectionCmd implements Cmd {
} catch (Exception ex) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
} finally {
PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
if(sessionWrapper != null) sessionWrapper.release();
}
}

View File

@ -88,6 +88,7 @@ public class CreateShardCmd implements Cmd {
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
ZkStateReader zkStateReader = ocmh.zkStateReader;
PolicyHelper.SessionWrapper sessionWrapper = null;
boolean usePolicyFramework = usePolicyFramework(collection,ocmh);
List<ReplicaPosition> positions = null;
SolrCloseableLatch countDownLatch;
@ -103,6 +104,7 @@ public class CreateShardCmd implements Cmd {
numNrtReplicas,
numTlogReplicas,
numPullReplicas);
sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
} else {
List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
createNodeSetStr, ocmh.overseer.getSolrCloudManager());
@ -164,7 +166,7 @@ public class CreateShardCmd implements Cmd {
});
}
} finally {
PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
if(sessionWrapper != null) sessionWrapper.release();
}
log.debug("Waiting for create shard action to complete");

View File

@ -219,6 +219,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
List<String> sliceNames = new ArrayList<>();
restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
PolicyHelper.SessionWrapper sessionWrapper = null;
try {
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
@ -226,6 +227,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
nodeList, restoreCollectionName,
message, sliceNames,
numNrtReplicas, numTlogReplicas, numPullReplicas);
sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
//Create one replica per shard and copy backed up data to it
for (Slice slice : restoreCollection.getSlices()) {
log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
@ -350,7 +352,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
} finally {
PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
if (sessionWrapper != null) sessionWrapper.release();
}
}

View File

@ -92,6 +92,8 @@ public class SplitShardCmd implements Cmd {
DocCollection collection = clusterState.getCollection(collectionName);
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
PolicyHelper.SessionWrapper sessionWrapper = null;
Slice parentSlice;
@ -392,6 +394,7 @@ public class SplitShardCmd implements Cmd {
collectionName,
new ZkNodeProps(collection.getProperties()),
subSlices, repFactor - 1, 0, 0);
sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
@ -513,7 +516,7 @@ public class SplitShardCmd implements Cmd {
log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
} finally {
PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
if (sessionWrapper != null) sessionWrapper.release();
}
}
}

View File

@ -84,7 +84,8 @@ public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
executeAll(requests);
Policy.Session session = autoScalingConfig.getPolicy().createSession(ocmh.overseer.getSolrCloudManager());
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(ocmh.overseer.getSolrCloudManager());
Policy.Session session = sessionWrapper.get();
for (; ; ) {
Suggester suggester = session.getSuggester(MOVEREPLICA)
.hint(Suggester.Hint.TARGET_NODE, nodeName);
@ -96,9 +97,12 @@ public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
REPLICA_PROP, request.getParams().get(REPLICA_PROP),
ASYNC, request.getParams().get(ASYNC)));
}
sessionWrapper.returnSession(session);
try {
executeAll(requests);
} finally {
sessionWrapper.release();
}
}
private void executeAll(List<ZkNodeProps> requests) throws Exception {

View File

@ -28,11 +28,12 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,8 +57,13 @@ public class ComputePlanAction extends TriggerActionBase {
if (autoScalingConf.isEmpty()) {
throw new Exception("Action: " + getName() + " executed but no policy is configured");
}
Policy policy = autoScalingConf.getPolicy();
Policy.Session session = policy.createSession(cloudManager);
// Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
// return new PolicyHelper.SessionWrapper(session, null);
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(cloudManager);
Policy.Session session = sessionWrapper.get();
// Policy policy = autoScalingConf.getPolicy();
try {
Suggester suggester = getSuggester(session, event, cloudManager);
while (true) {
SolrRequest operation = suggester.getSuggestion();
@ -73,11 +79,20 @@ public class ComputePlanAction extends TriggerActionBase {
session = suggester.getSession();
suggester = getSuggester(session, event, cloudManager);
}
} finally {
releasePolicySession(sessionWrapper, session);
}
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected exception while processing event: " + event, e); }
}
private void releasePolicySession(PolicyHelper.SessionWrapper sessionWrapper, Policy.Session session) {
sessionWrapper.returnSession(session);
sessionWrapper.release();
}
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
Suggester suggester;
switch (event.getEventType()) {

View File

@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;")
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper=DEBUG")
public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

View File

@ -30,6 +30,7 @@ import org.apache.solr.common.util.ObjectCache;
*/
public class DelegatingCloudManager implements SolrCloudManager {
private final SolrCloudManager delegate;
private ObjectCache objectCache = new ObjectCache();
public DelegatingCloudManager(SolrCloudManager delegate) {
this.delegate = delegate;
@ -57,7 +58,7 @@ public class DelegatingCloudManager implements SolrCloudManager {
@Override
public ObjectCache getObjectCache() {
return delegate.getObjectCache();
return delegate == null ? objectCache : delegate.getObjectCache();
}
@Override

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
public class DelegatingDistribStateManager implements DistribStateManager {
private final DistribStateManager delegate;
public DelegatingDistribStateManager(DistribStateManager delegate) {
this.delegate = delegate;
}
@Override
public boolean hasData(String path) throws IOException, KeeperException, InterruptedException {
return delegate.hasData(path);
}
@Override
public List<String> listData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
return delegate.listData(path);
}
@Override
public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
return delegate.getData(path, watcher);
}
@Override
public VersionedData getData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
return delegate.getData(path);
}
@Override
public void makePath(String path) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
delegate.makePath(path);
}
@Override
public String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
return delegate.createData(path, data, mode);
}
@Override
public void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
delegate.removeData(path, version);
}
@Override
public void setData(String path, byte[] data, int version) throws BadVersionException, NoSuchElementException, IOException, KeeperException, InterruptedException {
delegate.setData(path, data, version);
}
@Override
public List<OpResult> multi(Iterable<Op> ops) throws BadVersionException, NoSuchElementException, AlreadyExistsException, IOException, KeeperException, InterruptedException {
return delegate.multi(ops);
}
@Override
public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws InterruptedException, IOException {
return delegate.getAutoScalingConfig(watcher);
}
@Override
public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
return delegate.getAutoScalingConfig();
}
}

View File

@ -18,13 +18,14 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
@ -36,12 +37,19 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
import static org.apache.solr.common.util.Utils.time;
import static org.apache.solr.common.util.Utils.timeElapsed;
public class PolicyHelper {
private static ThreadLocal<Map<String, String>> policyMapping = new ThreadLocal<>();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static List<ReplicaPosition> getReplicaLocations(String collName, AutoScalingConfig autoScalingConfig,
SolrCloudManager cloudManager,
Map<String, String> optionalPolicyMapping,
@ -64,14 +72,33 @@ public class PolicyHelper {
public ClusterStateProvider getClusterStateProvider() {
return stateProvider;
}
@Override
public DistribStateManager getDistribStateManager() {
if (autoScalingConfig != null) {
return new DelegatingDistribStateManager(null) {
@Override
public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
return autoScalingConfig;
}
};
} else {
return super.getDistribStateManager();
}
}
};
policyMapping.set(optionalPolicyMapping);
SessionWrapper sessionWrapper = null;
Policy.Session session = null;
try {
session = SESSION_REF.get() != null ?
SESSION_REF.get().initOrGet(delegatingManager, autoScalingConfig.getPolicy()) :
autoScalingConfig.getPolicy().createSession(delegatingManager);
try {
SESSION_WRAPPPER_REF.set(sessionWrapper = getSession(delegatingManager));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to get autoscaling policy session", e);
}
session = sessionWrapper.session;
Map<Replica.Type, Integer> typeVsCount = new EnumMap<>(Replica.Type.class);
typeVsCount.put(Replica.Type.NRT, nrtReplicas);
@ -100,15 +127,16 @@ public class PolicyHelper {
}
}
} finally {
if (session != null && SESSION_REF.get() != null) SESSION_REF.get().updateSession(session);
policyMapping.remove();
if (sessionWrapper != null) {
sessionWrapper.returnSession(session);
}
}
return positions;
}
public static final int SESSION_EXPIRY = 180;//3 seconds
public static ThreadLocal<Long> REF_VERSION = new ThreadLocal<>();
public static MapWriter getDiagnostics(Policy policy, SolrClientCloudManager cloudManager) {
Policy.Session session = policy.createSession(cloudManager);
@ -155,80 +183,220 @@ public class PolicyHelper {
return suggestionCtx.getSuggestions();
}
public static class SessionRef {
private final AtomicLong myVersion = new AtomicLong(0);
AtomicInteger refCount = new AtomicInteger();
private Policy.Session session;
long lastUsedTime;
public enum Status {
NULL,
//it is just created and not yet used or all operations on it has been competed fully
UNUSED,
COMPUTING, EXECUTING;
}
/**
* This class stores a session for sharing purpose. If a process creates a session to
* compute operations,
* 1) see if there is a session that is available in the cache,
* 2) if yes, check if it is expired
* 3) if it is expired, create a new session
* 4) if it is not expired, borrow it
* 5) after computing operations put it back in the cache
*/
static class SessionRef {
private final Object lockObj = new Object();
private SessionWrapper sessionWrapper = SessionWrapper.DEF_INST;
public SessionRef() {
}
public long getRefVersion(){
return myVersion.get();
//only for debugging
SessionWrapper getSessionWrapper() {
return sessionWrapper;
}
/**
* All operations suggested by the current session object
* is complete. Do not even cache anything
*
*/
private void release(SessionWrapper sessionWrapper) {
synchronized (lockObj) {
if (sessionWrapper.createTime == this.sessionWrapper.createTime && this.sessionWrapper.refCount.get() <= 0) {
log.debug("session set to NULL");
this.sessionWrapper = SessionWrapper.DEF_INST;
} // else somebody created a new session b/c of expiry . So no need to do anything about it
}
}
/**
* Computing is over for this session and it may contain a new session with new state
* The session can be used by others while the caller is performing operations
*
*/
private void returnSession(SessionWrapper sessionWrapper) {
synchronized (lockObj) {
sessionWrapper.status = Status.EXECUTING;
log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(MILLISECONDS),
sessionWrapper.createTime,
this.sessionWrapper.createTime);
if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
//this session was used for computing new operations and this can now be used for other
// computing
this.sessionWrapper = sessionWrapper;
//one thread who is waiting for this need to be notified.
lockObj.notify();
} else {
log.info("create time NOT SAME {} ", SessionWrapper.DEF_INST.createTime);
//else just ignore it
}
}
}
public void decref(long version) {
synchronized (SessionRef.class) {
if (session == null) return;
if(myVersion.get() != version) return;
if (refCount.decrementAndGet() <= 0) {
session = null;
lastUsedTime = 0;
public SessionWrapper get(SolrCloudManager cloudManager) throws IOException, InterruptedException {
synchronized (lockObj) {
if (sessionWrapper.status == Status.NULL ||
TimeUnit.SECONDS.convert(System.nanoTime() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
//no session available or the session is expired
return createSession(cloudManager);
} else {
long waitStart = time(MILLISECONDS);
//the session is not expired
log.debug("reusing a session {}", this.sessionWrapper.createTime);
if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
this.sessionWrapper.status = Status.COMPUTING;
return sessionWrapper;
} else {
//status= COMPUTING it's being used for computing. computing is
log.debug("session being used. waiting... current time {} ", time(MILLISECONDS));
try {
lockObj.wait(10 * 1000);//wait for a max of 10 seconds
} catch (InterruptedException e) {
log.info("interrupted... ");
}
log.debug("out of waiting curr-time:{} time-elapsed {}", time(MILLISECONDS), timeElapsed(waitStart, MILLISECONDS));
// now this thread has woken up because it got timed out after 10 seconds or it is notified after
//the session was returned from another COMPUTING operation
if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
log.debug("Wait over. reusing the existing session ");
this.sessionWrapper.status = Status.COMPUTING;
return sessionWrapper;
} else {
//create a new Session
return createSession(cloudManager);
}
}
}
}
}
private SessionWrapper createSession(SolrCloudManager cloudManager) throws InterruptedException, IOException {
synchronized (lockObj) {
log.debug("Creating a new session");
Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
log.debug("New session created ");
this.sessionWrapper = new SessionWrapper(session, this);
this.sessionWrapper.status = Status.COMPUTING;
return sessionWrapper;
}
}
}
/**
* How to get a shared Policy Session
* 1) call {@link #getSession(SolrCloudManager)}
* 2) compute all suggestions
* 3) call {@link SessionWrapper#returnSession(Policy.Session)}
* 4) perform all suggestions
* 5) call {@link SessionWrapper#release()}
*/
public static SessionWrapper getSession(SolrCloudManager cloudManager) throws IOException, InterruptedException {
SessionRef sessionRef = (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
return sessionRef.get(cloudManager);
}
/**
* Use this to get the last used session wrapper in this thread
*
* @param clear whether to unset the threadlocal or not
*/
public static SessionWrapper getLastSessionWrapper(boolean clear) {
SessionWrapper wrapper = SESSION_WRAPPPER_REF.get();
if (clear) SESSION_WRAPPPER_REF.remove();
return wrapper;
}
static ThreadLocal<SessionWrapper> SESSION_WRAPPPER_REF = new ThreadLocal<>();
public static class SessionWrapper {
public static final SessionWrapper DEF_INST = new SessionWrapper(null, null);
static {
DEF_INST.status = Status.NULL;
DEF_INST.createTime = -1l;
DEF_INST.lastUpdateTime = -1l;
}
private long createTime;
private long lastUpdateTime;
private Policy.Session session;
public Status status;
private final SessionRef ref;
private AtomicInteger refCount = new AtomicInteger();
public long getCreateTime() {
return createTime;
}
public long getLastUpdateTime() {
return lastUpdateTime;
}
public SessionWrapper(Policy.Session session, SessionRef ref) {
lastUpdateTime = createTime = System.nanoTime();
this.session = session;
this.status = Status.UNUSED;
this.ref = ref;
}
public Policy.Session get() {
return session;
}
public SessionWrapper update(Policy.Session session) {
this.lastUpdateTime = System.nanoTime();
this.session = session;
return this;
}
public int getRefCount() {
return refCount.get();
}
public Policy.Session get() {
synchronized (SessionRef.class) {
if (session == null) return null;
if (TimeUnit.SECONDS.convert(System.nanoTime() - lastUsedTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
session = null;
return null;
} else {
REF_VERSION.set(myVersion.get());
/**
* return this for later use and update the session with the latest state
* ensure that this is done after computing the suggestions
*/
public void returnSession(Policy.Session session) {
this.update(session);
refCount.incrementAndGet();
return session;
}
}
}
public Policy.Session initOrGet(SolrCloudManager cloudManager, Policy policy) {
synchronized (SessionRef.class) {
Policy.Session session = get();
if (session != null) return session;
this.session = policy.createSession(cloudManager);
myVersion.incrementAndGet();
lastUsedTime = System.nanoTime();
REF_VERSION.set(myVersion.get());
refCount.set(1);
return this.session;
}
}
private void updateSession(Policy.Session session) {
this.session = session;
lastUsedTime = System.nanoTime();
}
}
public static void clearFlagAndDecref(SessionRef policySessionRef) {
Long refVersion = REF_VERSION.get();
if (refVersion != null) policySessionRef.decref(refVersion);
REF_VERSION.remove();
}
public static PolicyHelper.SessionRef getPolicySessionRef(SolrCloudManager cloudManager){
return (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
}
public static ThreadLocal<SessionRef> SESSION_REF = new ThreadLocal<>();
ref.returnSession(this);
}
//all ops are executed now it can be destroyed
public void release() {
refCount.decrementAndGet();
ref.release(this);
}
}
}

View File

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableSet;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class Utils {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -442,4 +444,13 @@ public class Utils {
}
}
}
public static long time(TimeUnit unit) {
return unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
}
public static long timeElapsed(long start, TimeUnit unit) {
return unit.convert(System.nanoTime() - NANOSECONDS.convert(start, unit), NANOSECONDS);
}
}

View File

@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.ImmutableList;
import org.apache.solr.SolrTestCaseJ4;
@ -846,15 +847,15 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Test
public void testSessionCaching() {
PolicyHelper.SessionRef ref1 = new PolicyHelper.SessionRef();
public void testSessionCaching() throws IOException, InterruptedException {
// PolicyHelper.SessionRef ref1 = new PolicyHelper.SessionRef();
String autoScalingjson = " '{cluster-policy':[" +
" { 'cores':'<10', 'node':'#ANY'}," +
" { 'replica':'<2', 'shard':'#EACH', 'node':'#ANY'}," +
" { 'nodeRole':'overseer','replica':0}]," +
" 'cluster-preferences':[{'minimize':'cores'}]}";
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
PolicyHelper.SESSION_REF.set(ref1);
// PolicyHelper.SESSION_REF.set(ref1);
String nodeValues = " {" +
" 'node4':{" +
" 'node':'10.0.0.4:8987_solr'," +
@ -870,7 +871,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
" 'freedisk':884.7097854614258}," +
"}";
SolrCloudManager provider = getSolrCloudManager((Map<String, Map>) Utils.fromJSONString(nodeValues), clusterState);
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
@ -882,44 +883,70 @@ public class TestPolicy extends SolrTestCaseJ4 {
" ]" +
"}");
AutoScalingConfig config = new AutoScalingConfig(policies);
final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager((Map<String, Map>) Utils.fromJSONString(nodeValues),
clusterState)) {
@Override
public DistribStateManager getDistribStateManager() {
return delegatingDistribStateManager(config);
}
};
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations("c", config, provider, null,
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations("c", config, solrCloudManager, null,
Arrays.asList("s1", "s2"), 1, 0, 0,
null);
long sessionRefVersion = PolicyHelper.REF_VERSION.get();
PolicyHelper.SessionRef ref1Copy = PolicyHelper.SESSION_REF.get();
PolicyHelper.SESSION_REF.remove();
Policy.Session session = ref1Copy.get();
assertNotNull(session);
assertEquals(ref1, ref1Copy);
assertTrue(session.getPolicy() == config.getPolicy());
ref1Copy.decref(sessionRefVersion);
PolicyHelper.SESSION_REF.set(ref1);
AutoScalingConfig config2 = new AutoScalingConfig(policies);
locations = PolicyHelper.getReplicaLocations("c2", config2, provider, null, Arrays.asList("s1", "s2"), 1, 0, 0,
null);
sessionRefVersion = PolicyHelper.REF_VERSION.get();
ref1Copy = PolicyHelper.SESSION_REF.get();
PolicyHelper.SESSION_REF.remove();
session = ref1Copy.get();
ref1Copy.decref(sessionRefVersion);
assertEquals(ref1, ref1Copy);
assertFalse(session.getPolicy() == config2.getPolicy());
assertTrue(session.getPolicy() == config.getPolicy());
assertEquals(2, ref1Copy.getRefCount());
ref1.decref(sessionRefVersion);//decref 1
ref1.decref(sessionRefVersion);//decref 2
PolicyHelper.SESSION_REF.set(ref1);
locations = PolicyHelper.getReplicaLocations("c3", config2, provider, null, Arrays.asList("s1", "s2"), 1, 0, 0,
null);
sessionRefVersion = PolicyHelper.REF_VERSION.get();
ref1Copy = PolicyHelper.SESSION_REF.get();
PolicyHelper.SESSION_REF.remove();
session = ref1Copy.get();
ref1Copy.decref(sessionRefVersion);
assertTrue(session.getPolicy() == config2.getPolicy());
PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
assertNotNull(sessionRef);
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
Policy.Session session = sessionWrapper.get();
assertNotNull(session);
assertTrue(session.getPolicy() == config.getPolicy());
assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING);
sessionWrapper.release();
assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
assertEquals(sessionRef.getSessionWrapper().getCreateTime(), s1.getCreateTime());
PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1];
AtomicLong secondTime = new AtomicLong();
Thread thread = new Thread(() -> {
try {
s2[0] = PolicyHelper.getSession(solrCloudManager);
secondTime.set(System.nanoTime());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
thread.start();
Thread.sleep(50);
long beforeReturn = System.nanoTime();
assertEquals(s1.getCreateTime(), sessionRef.getSessionWrapper().getCreateTime());
s1.returnSession(s1.get());
assertEquals(1, s1.getRefCount());
thread.join();
assertNotNull(s2[0]);
assertTrue(secondTime.get() > beforeReturn);
assertTrue(s1.getCreateTime() == s2[0].getCreateTime());
s2[0].returnSession(s2[0].get());
assertEquals(2, s1.getRefCount());
s2[0].release();
assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
s1.release();
assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
}
private DistribStateManager delegatingDistribStateManager(AutoScalingConfig config) {
return new DelegatingDistribStateManager(null) {
@Override
public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
return config;
}
};
}
public void testNegativeConditions() {