SOLR-14462: cache more than one autoscaling session (#1504)

SOLR-14462: cache more than one autoscaling session
This commit is contained in:
Ilan Ginzburg 2020-06-24 22:02:43 +02:00 committed by GitHub
parent f47de19c4e
commit 25428013fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 335 additions and 111 deletions

View File

@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -396,33 +397,59 @@ public class PolicyHelper {
}
public enum Status {
NULL,
//it is just created and not yet used or all operations on it has been completed fully
UNUSED,
COMPUTING, EXECUTING
/**
* A command is actively using and modifying the session to compute placements
*/
COMPUTING,
/**
* A command is not done yet processing its changes but no longer updates or even uses the session
*/
EXECUTING
}
/**
* This class stores a session for sharing purpose. If a process creates a session to
* compute operations,
* 1) see if there is a session that is available in the cache,
* 2) if yes, check if it is expired
* 3) if it is expired, create a new session
* 4) if it is not expired, borrow it
* 5) after computing operations put it back in the cache
* This class stores sessions for sharing purposes. If a process requires a session to
* compute operations:
* <ol>
* <li>see if there is an available non expired session in the cache,</li>
* <li>if yes, borrow it.</li>
* <li>if no, create a new one and borrow it.</li>
* <li>after computing (update) operations are done, {@link #returnSession(SessionWrapper)} back to the cache so it's
* again available for borrowing.</li>
* <li>after all borrowers are done computing then executing with the session, {@link #release(SessionWrapper)} it,
* which removes it from the cache.</li>
* </ol>
*/
static class SessionRef {
/**
* Lock protecting access to {@link #sessionWrapperSet} and to {@link #creationsInProgress}
*/
private final Object lockObj = new Object();
private SessionWrapper sessionWrapper = SessionWrapper.DEFAULT_INSTANCE;
/**
* Sessions currently in use in {@link Status#COMPUTING} or {@link Status#EXECUTING} states. As soon as all
* uses of a session are over, that session is removed from this set. Sessions not actively in use are NOT kept around.
*
* <p>Access should only be done under the protection of {@link #lockObj}</p>
*/
private Set<SessionWrapper> sessionWrapperSet = Collections.newSetFromMap(new IdentityHashMap<>());
/**
* Number of sessions currently being created but not yet present in {@link #sessionWrapperSet}.
*
* <p>Access should only be done under the protection of {@link #lockObj}</p>
*/
private int creationsInProgress = 0;
public SessionRef() {
}
//only for debugging
SessionWrapper getSessionWrapper() {
return sessionWrapper;
// used only by tests
boolean isEmpty() {
synchronized (lockObj) {
return sessionWrapperSet.isEmpty();
}
}
/**
@ -430,11 +457,19 @@ public class PolicyHelper {
* is complete. Do not even cache anything
*/
private void release(SessionWrapper sessionWrapper) {
boolean present;
synchronized (lockObj) {
if (sessionWrapper.createTime == this.sessionWrapper.createTime && this.sessionWrapper.refCount.get() <= 0) {
log.debug("session set to NULL");
this.sessionWrapper = SessionWrapper.DEFAULT_INSTANCE;
} // else somebody created a new session b/c of expiry . So no need to do anything about it
present = sessionWrapperSet.remove(sessionWrapper);
}
if (!present) {
log.warn("released session {} not found in session set", sessionWrapper.getCreateTime());
} else {
if (log.isDebugEnabled()) {
TimeSource timeSource = sessionWrapper.session.cloudManager.getTimeSource();
log.debug("final release, session {} lived a total of {}ms, ", sessionWrapper.getCreateTime(),
timeElapsed(timeSource, TimeUnit.MILLISECONDS.convert(sessionWrapper.getCreateTime(),
TimeUnit.NANOSECONDS), MILLISECONDS)); // logOk
}
}
}
@ -443,87 +478,149 @@ public class PolicyHelper {
* The session can be used by others while the caller is performing operations
*/
private void returnSession(SessionWrapper sessionWrapper) {
TimeSource timeSource = sessionWrapper.session != null ? sessionWrapper.session.cloudManager.getTimeSource() : TimeSource.NANO_TIME;
boolean present;
synchronized (lockObj) {
sessionWrapper.status = Status.EXECUTING;
if (log.isDebugEnabled()) {
log.debug("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} "
, time(timeSource, MILLISECONDS),
sessionWrapper.createTime,
this.sessionWrapper.createTime);
}
if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
//this session was used for computing new operations and this can now be used for other
// computing
this.sessionWrapper = sessionWrapper;
present = sessionWrapperSet.contains(sessionWrapper);
//one thread who is waiting for this need to be notified.
lockObj.notify();
} else {
log.debug("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
//else just ignore it
}
// wake up single thread waiting for a session return (ok if not woken up, wait is short)
// Important to wake up a single one, otherwise of multiple waiting threads, all but one will immediately create new sessions
lockObj.notify();
}
// Logging
if (present) {
if (log.isDebugEnabled()) {
log.debug("returnSession {}", sessionWrapper.getCreateTime());
}
} else {
log.warn("returning unknown session {} ", sessionWrapper.getCreateTime());
}
}
public SessionWrapper get(SolrCloudManager cloudManager) throws IOException, InterruptedException {
/**
* <p>Method returning an available session that can be used for {@link Status#COMPUTING}, either from the
* {@link #sessionWrapperSet} cache or by creating a new one. The status of the returned session is set to {@link Status#COMPUTING}.</p>
*
* Some waiting is done in two cases:
* <ul>
* <li>A candidate session is present in {@link #sessionWrapperSet} but is still {@link Status#COMPUTING}, a random wait
* is observed to see if the session gets freed to save a session creation and allow session reuse,</li>
* <li>It is necessary to create a new session but there are already sessions in the process of being created, a
* random wait is observed (if no waiting already occurred waiting for a session to become free) before creation
* takes place, just in case one of the created sessions got used then {@link #returnSession(SessionWrapper)} in the meantime.</li>
* </ul>
*
* The random wait prevents the "thundering herd" effect when all threads needing a session at the same time create a new
* one even though some differentiated waits could have led to better reuse and less session creations.
*
* @param allowWait usually <code>true</code> except in tests that know there's no point in waiting because nothing
* will happen...
*/
public SessionWrapper get(SolrCloudManager cloudManager, boolean allowWait) throws IOException, InterruptedException {
TimeSource timeSource = cloudManager.getTimeSource();
long oldestUpdateTimeNs = TimeUnit.SECONDS.convert(timeSource.getTimeNs(), TimeUnit.NANOSECONDS) - SESSION_EXPIRY;
int zkVersion = cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion();
synchronized (lockObj) {
if (sessionWrapper.status == Status.NULL ||
sessionWrapper.zkVersion != cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion() ||
TimeUnit.SECONDS.convert(timeSource.getTimeNs() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
//no session available or the session is expired
return createSession(cloudManager);
} else {
SessionWrapper sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
// Best case scenario: an available session
if (sw != null) {
if (log.isDebugEnabled()) {
log.debug("reusing session {}", sw.getCreateTime());
}
return sw;
}
// Wait for a while before deciding what to do if waiting could help...
if ((creationsInProgress != 0 || hasCandidateSession(zkVersion, oldestUpdateTimeNs)) && allowWait) {
// Either an existing session might be returned and become usable while we wait, or a session in the process of being
// created might finish creation, be used then returned and become usable. So we wait.
// wait 1 to 10 secs. Random to help spread wakeups.
long waitForMs = (long) (Math.random() * 9 * 1000) + 1000;
if (log.isDebugEnabled()) {
log.debug("No sessions are available, all busy COMPUTING (or {} creations in progress). starting wait of {}ms",
creationsInProgress, waitForMs);
}
long waitStart = time(timeSource, MILLISECONDS);
//the session is not expired
log.debug("reusing a session {}", this.sessionWrapper.createTime);
if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
this.sessionWrapper.status = Status.COMPUTING;
return sessionWrapper;
} else {
//status= COMPUTING it's being used for computing. computing is
try {
lockObj.wait(waitForMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (log.isDebugEnabled()) {
log.debug("out of waiting. wait of {}ms, actual time elapsed {}ms", waitForMs, timeElapsed(timeSource, waitStart, MILLISECONDS));
}
// We've waited, now we can either reuse immediately an available session, or immediately create a new one
sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
// Second best case scenario: an available session
if (sw != null) {
if (log.isDebugEnabled()) {
log.debug("session being used. waiting... current time {} ", time(timeSource, MILLISECONDS));
}
try {
lockObj.wait(10 * 1000);//wait for a max of 10 seconds
} catch (InterruptedException e) {
log.info("interrupted... ");
}
if (log.isDebugEnabled()) {
log.debug("out of waiting curr-time:{} time-elapsed {}"
, time(timeSource, MILLISECONDS), timeElapsed(timeSource, waitStart, MILLISECONDS));
}
// now this thread has woken up because it got timed out after 10 seconds or it is notified after
// the session was returned from another COMPUTING operation
if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
log.debug("Wait over. reusing the existing session ");
this.sessionWrapper.status = Status.COMPUTING;
return sessionWrapper;
} else {
//create a new Session
return createSession(cloudManager);
log.debug("reusing session {} after wait", sw.getCreateTime());
}
return sw;
}
}
// We're going to create a new Session OUTSIDE of the critical section because session creation can take quite some time
creationsInProgress++;
}
SessionWrapper newSessionWrapper = null;
try {
if (log.isDebugEnabled()) {
log.debug("Creating a new session");
}
Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
newSessionWrapper = new SessionWrapper(session, this);
if (log.isDebugEnabled()) {
log.debug("New session created, {}", newSessionWrapper.getCreateTime());
}
return newSessionWrapper;
} finally {
synchronized (lockObj) {
creationsInProgress--;
if (newSessionWrapper != null) {
// Session created successfully
sessionWrapperSet.add(newSessionWrapper);
}
}
}
}
private SessionWrapper createSession(SolrCloudManager cloudManager) throws InterruptedException, IOException {
synchronized (lockObj) {
log.debug("Creating a new session");
Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
log.debug("New session created ");
this.sessionWrapper = new SessionWrapper(session, this);
this.sessionWrapper.status = Status.COMPUTING;
return sessionWrapper;
/**
* Returns an available session from the cache (the best one once cache strategies are defined), or null if no session
* from the cache is available (i.e. all are still COMPUTING, are too old, wrong zk version or the cache is empty).<p>
* This method must be called while holding the monitor on {@link #lockObj}.<p>
* The method updates the session status to computing.
*/
private SessionWrapper getAvailableSession(int zkVersion, long oldestUpdateTimeNs) {
for (SessionWrapper sw : sessionWrapperSet) {
if (sw.status == Status.EXECUTING && sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == zkVersion) {
sw.status = Status.COMPUTING;
return sw;
}
}
return null;
}
/**
* Returns true if there's a session in the cache that could be returned (if it was free). This is required to
* know if there's any point in waiting or if a new session should better be created right away.
*/
private boolean hasCandidateSession(int zkVersion, long oldestUpdateTimeNs) {
for (SessionWrapper sw : sessionWrapperSet) {
if (sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == zkVersion) {
return true;
}
}
return false;
}
}
/**
@ -535,8 +632,12 @@ public class PolicyHelper {
* 5) call {@link SessionWrapper#release()}
*/
public static SessionWrapper getSession(SolrCloudManager cloudManager) throws IOException, InterruptedException {
return getSession(cloudManager, true);
}
static SessionWrapper getSession(SolrCloudManager cloudManager, boolean allowWait) throws IOException, InterruptedException {
SessionRef sessionRef = (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
return sessionRef.get(cloudManager);
return sessionRef.get(cloudManager, allowWait);
}
/**
@ -556,22 +657,21 @@ public class PolicyHelper {
public static class SessionWrapper {
public static final SessionWrapper DEFAULT_INSTANCE = new SessionWrapper(null, null);
static {
DEFAULT_INSTANCE.status = Status.NULL;
DEFAULT_INSTANCE.createTime = -1L;
DEFAULT_INSTANCE.lastUpdateTime = -1L;
}
private long createTime;
private final long createTime;
private long lastUpdateTime;
private Policy.Session session;
public Status status;
private final SessionRef ref;
private AtomicInteger refCount = new AtomicInteger();
/**
* Number of commands currently using the session in {@link Status#EXECUTING}. There is one <b>additional</b> command
* using the session and updating it if {@link #status} is {@link Status#COMPUTING}
*/
private final AtomicInteger refCount = new AtomicInteger();
public final long zkVersion;
/**
* Nanoseconds (since/to some arbitrary time) when the session got created. Also used in logs (only in logs!) to identify the session.
*/
public long getCreateTime() {
return createTime;
}
@ -581,27 +681,22 @@ public class PolicyHelper {
}
public SessionWrapper(Policy.Session session, SessionRef ref) {
lastUpdateTime = createTime = session != null ?
session.cloudManager.getTimeSource().getTimeNs() :
TimeSource.NANO_TIME.getTimeNs();
createTime = session.cloudManager.getTimeSource().getTimeNs();
lastUpdateTime = createTime;
this.session = session;
this.status = Status.UNUSED;
this.status = Status.COMPUTING; // Created for being used, so COMPUTING right away
this.ref = ref;
this.zkVersion = session == null ?
0 :
session.getPolicy().getZkVersion();
this.zkVersion = session.getPolicy().getZkVersion();
}
public Policy.Session get() {
return session;
}
public SessionWrapper update(Policy.Session session) {
this.lastUpdateTime = session != null ?
session.cloudManager.getTimeSource().getTimeNs() :
TimeSource.NANO_TIME.getTimeNs();
public void update(Policy.Session session) {
// JMM multithreaded access issue on lastUpdateTime.
this.lastUpdateTime = session.cloudManager.getTimeSource().getTimeNs();
this.session = session;
return this;
}
public int getRefCount() {
@ -613,6 +708,10 @@ public class PolicyHelper {
* ensure that this is done after computing the suggestions
*/
public void returnSession(Policy.Session session) {
if (this.status != Status.COMPUTING) {
log.warn("returning session {} not in state COMPUTING", this.getCreateTime());
}
this.update(session);
this.returnSession();
}
@ -627,8 +726,9 @@ public class PolicyHelper {
//all ops are executed now it can be destroyed
public void release() {
refCount.decrementAndGet();
ref.release(this);
if (refCount.decrementAndGet() <= 0) {
ref.release(this);
}
}
}
}

View File

@ -37,6 +37,7 @@ import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
@ -1727,9 +1728,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
assertTrue(session.getPolicy() == config.getPolicy());
assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING);
sessionWrapper.release();
assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
assertTrue(sessionRef.isEmpty());
PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
assertEquals(sessionRef.getSessionWrapper().getCreateTime(), s1.getCreateTime());
PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1];
AtomicLong secondTime = new AtomicLong();
Thread thread = new Thread(() -> {
@ -1743,7 +1743,6 @@ public class TestPolicy extends SolrTestCaseJ4 {
thread.start();
Thread.sleep(50);
long beforeReturn = System.nanoTime();
assertEquals(s1.getCreateTime(), sessionRef.getSessionWrapper().getCreateTime());
s1.returnSession(s1.get());
assertEquals(1, s1.getRefCount());
thread.join();
@ -1755,13 +1754,138 @@ public class TestPolicy extends SolrTestCaseJ4 {
assertEquals(2, s1.getRefCount());
s2[0].release();
assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
assertFalse(sessionRef.isEmpty());
s1.release();
assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
assertTrue(sessionRef.isEmpty());
}
@Test
public void testMultiSessionsCache() throws IOException, InterruptedException {
@SuppressWarnings({"rawtypes", "unchecked"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString(" {" +
" 'node1':{ 'node':'10.0.0.4:8987_solr', 'cores':1 }," +
" 'node2':{ 'node':'10.0.0.4:8989_solr', 'cores':1 }," +
" 'node3':{ 'node':'10.0.0.4:7574_solr', 'cores':1 }" +
"}");
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{ 'cluster-preferences': [{ 'minimize': 'cores', 'precision': 1}]}");
@SuppressWarnings({"unchecked"})
AutoScalingConfig config = new AutoScalingConfig(policies);
final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager(nodeValues, clusterState)) {
@Override
public DistribStateManager getDistribStateManager() {
return delegatingDistribStateManager(config);
}
};
PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
// Must skip the wait time otherwise test takes a few seconds to run (and s1 is not returned now anyway so no point waiting).
PolicyHelper.SessionWrapper s2 = PolicyHelper.getSession(solrCloudManager, false);
// Got two sessions, they are different
assertNotSame(s1, s2);
// Done COMPUTING with first session, it can be reused
s1.returnSession(s1.get());
PolicyHelper.SessionWrapper s3 = PolicyHelper.getSession(solrCloudManager);
// First session indeed reused when a new session is requested
assertSame(s3, s1);
// Done COMPUTING with second session, it can be reused
s2.returnSession(s2.get());
PolicyHelper.SessionWrapper s4 = PolicyHelper.getSession(solrCloudManager);
// Second session indeed reused when a new session is requested
assertSame(s4, s2);
s4.returnSession(s4.get());
s4.release();
s2.release();
s3.returnSession(s3.get());
s3.release();
PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
// First session not yet released so is still in the cache
assertFalse(sessionRef.isEmpty());
s1.release();
assertTrue(sessionRef.isEmpty());
}
/**
* Verify number of sessions allocated when parallel session requests arrive is reasonable.
* Test takes about 3 seconds to run.
*/
@Test
@Slow
public void testMultiThreadedSessionsCache() throws IOException, InterruptedException {
@SuppressWarnings({"rawtypes", "unchecked"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString(" {" +
" 'node1':{ 'node':'10.0.0.4:8987_solr', 'cores':1 }," +
" 'node2':{ 'node':'10.0.0.4:8989_solr', 'cores':1 }," +
" 'node3':{ 'node':'10.0.0.4:7574_solr', 'cores':1 }" +
"}");
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{ 'cluster-preferences': [{ 'minimize': 'cores', 'precision': 1}]}");
@SuppressWarnings({"unchecked"})
AutoScalingConfig config = new AutoScalingConfig(policies);
final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager(nodeValues, clusterState)) {
@Override
public DistribStateManager getDistribStateManager() {
return delegatingDistribStateManager(config);
}
};
final Set<PolicyHelper.SessionWrapper> seenSessions = Sets.newHashSet();
final AtomicInteger completedThreads = new AtomicInteger(0);
final int COUNT_THREADS = 100;
Thread[] threads = new Thread[COUNT_THREADS];
for (int i = 0; i < COUNT_THREADS; i++) {
threads[i] = new Thread(() -> {
try {
// This thread requests a session, computes using it for 50ms then returns is, executes for 1000ms more,
// releases the sessions and finishes.
PolicyHelper.SessionWrapper session = PolicyHelper.getSession(solrCloudManager);
seenSessions.add(session);
Thread.sleep(50);
session.returnSession(session.get());
Thread.sleep(1000);
session.release();
completedThreads.incrementAndGet();
} catch (InterruptedException | IOException ignored) {
}
});
threads[i].start();
}
for (int i = 0; i < COUNT_THREADS; i++) {
threads[i].join(12000);
}
assertEquals(COUNT_THREADS, completedThreads.get());
// The value asserted below is somewhat arbitrary. Running locally max seen is 10, so hopefully 30 is safe.
// Idea is to verify we do not allocate a high number of sessions even if many concurrent session
// requests arrive at the same time. The session computing time is short in purpose. If it were long, it would be
// expected for more sessions to be allocated.
assertTrue("Too many sessions created: " + seenSessions.size(), seenSessions.size() < 30);
PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
assertTrue(sessionRef.isEmpty());
}
private DistribStateManager delegatingDistribStateManager(AutoScalingConfig config) {
return new DelegatingDistribStateManager(null) {
@Override