Begin scavenge impl.

This commit is contained in:
Jan Bartel 2015-09-22 07:35:49 +10:00
parent e5b5bea259
commit 005d0f2ab8
1 changed files with 84 additions and 51 deletions

View File

@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -50,6 +51,9 @@ import com.google.gcloud.datastore.DatastoreFactory;
import com.google.gcloud.datastore.Entity;
import com.google.gcloud.datastore.Key;
import com.google.gcloud.datastore.KeyFactory;
import com.google.gcloud.datastore.Query;
import com.google.gcloud.datastore.Query.ResultType;
import com.google.gcloud.datastore.QueryResults;
@ -64,6 +68,7 @@ public class GCloudSessionManager extends AbstractSessionManager
public static final String KIND = "GCloudSession";
public static final int DEFAULT_MAX_QUERY_RESULTS = 100;
/**
* Sessions known to this node held in memory
@ -91,6 +96,10 @@ public class GCloudSessionManager extends AbstractSessionManager
private SessionEntityConverter _converter;
private int _maxResults;
/**
* Scavenger
*
@ -581,6 +590,14 @@ public class GCloudSessionManager extends AbstractSessionManager
return _expiryTime;
}
public boolean isExpiredAt (long time)
{
if (_expiryTime <= 0)
return false; //never expires
return (_expiryTime <= time);
}
public void swapId (String newId, String newNodeId)
{
//TODO probably synchronize rather than use the access/complete lock?
@ -719,58 +736,61 @@ public class GCloudSessionManager extends AbstractSessionManager
*/
public void scavenge ()
{
//scavenge in the database every so often
//TODO
/* Set<String> candidateIds = new HashSet<String>();
long now = System.currentTimeMillis();
LOG.info("SessionManager for context {} scavenging at {} ", getContextPath(getContext()), now);
synchronized (_sessions)
{
for (Map.Entry<String, Session> entry:_sessions.entrySet())
{
long expiry = entry.getValue().getExpiry();
if (expiry > 0 && expiry < now)
candidateIds.add(entry.getKey());
}
}
for (String candidateId:candidateIds)
{
if (LOG.isDebugEnabled())
LOG.debug("Session {} expired ", candidateId);
Session candidateSession = _sessions.get(candidateId);
if (candidateSession != null)
{
//double check the state of the session in the cache, as the
//session may have migrated to another node. This leaves a window
//where the cached session may have been changed by another node
Session cachedSession = load(makeKey(candidateId, _context));
if (cachedSession == null)
{
if (LOG.isDebugEnabled()) LOG.debug("Locally expired session({}) does not exist in cluster ",candidateId);
//the session no longer exists, do a full invalidation
candidateSession.timeout();
}
else if (getSessionIdManager().getWorkerName().equals(cachedSession.getLastNode()))
{
if (LOG.isDebugEnabled()) LOG.debug("Expiring session({}) local to session manager",candidateId);
//if I am the master of the session then it can be timed out
candidateSession.timeout();
}
else
{
//some other node is the master of the session, simply remove it from my memory
if (LOG.isDebugEnabled()) LOG.debug("Session({}) not local to this session manager, removing from local memory", candidateId);
candidateSession.willPassivate();
_sessions.remove(candidateSession.getClusterId());
}
}
}*/
//always scavenge in memory
scavengeMemory();
}
protected void scavengeMemory()
{
long now = System.currentTimeMillis();
for (Session s:_sessions.values())
{
if (s.isExpiredAt(now))
s.timeout();
}
}
protected void scavengeGCloudDataStore()
throws Exception
{
//query the datastore for sessions that have expired
long now = System.currentTimeMillis();
Query<Entity> query = Query.gqlQueryBuilder(ResultType.ENTITY, "select * from "+KIND+" where expiry < @1 LIMIT "+_maxResults)
.setBinding("1", now)
.build();
QueryResults<Entity> results = _datastore.run(query);
while (results.hasNext())
{
Entity sessionEntity = results.next();
scavengeSession(sessionEntity);
}
}
protected void scavengeSession (Entity e)
throws Exception
{
long now = System.currentTimeMillis();
Session session = _converter.sessionFromEntity(e);
if (session == null)
return;
//if the session isn't in memory already, put it there so we can do a normal timeout call
Session memSession =_sessions.get(session.getId());
if (memSession == null)
memSession = _sessions.putIfAbsent(session.getId(), memSession);
//final check
if (memSession.isExpiredAt(now))
memSession.timeout();
}
public long getScavengeInterval ()
{
@ -846,6 +866,21 @@ public class GCloudSessionManager extends AbstractSessionManager
{
_staleIntervalSec = staleIntervalSec;
}
public int getMaxResults()
{
return _maxResults;
}
public void setMaxResults(int maxResults)
{
if (_maxResults <= 0)
_maxResults = DEFAULT_MAX_QUERY_RESULTS;
else
_maxResults = maxResults;
}
/**
@ -894,7 +929,7 @@ public class GCloudSessionManager extends AbstractSessionManager
long now = System.currentTimeMillis();
try
{
//if the session is not in this node's memory, then load it from the cluster cache
//if the session is not in this node's memory, then load it from the datastore
if (memSession == null)
{
if (LOG.isDebugEnabled())
@ -903,8 +938,6 @@ public class GCloudSessionManager extends AbstractSessionManager
session = load(makeKey(idInCluster, _context));
if (session != null)
{
//We retrieved a session with the same key from the database
//Check that it wasn't expired
if (session.getExpiry() > 0 && session.getExpiry() <= now)
{