This commit is contained in:
Jan Bartel 2016-09-28 18:30:33 +10:00
parent b386551d3a
commit 6c1f0a6b4a
2 changed files with 233 additions and 46 deletions

View File

@ -71,6 +71,7 @@ public class GCloudSessionDataStore extends AbstractSessionDataStore
protected int _backoff = DEFAULT_BACKOFF_MS;
protected boolean _dsProvided = false;
protected boolean _indexesPresent = false;
protected EntityDataModel _model;
@ -302,6 +303,57 @@ public class GCloudSessionDataStore extends AbstractSessionDataStore
}
/**
* ExpiryInfo
*
* Information related to session expiry
*/
public static class ExpiryInfo
{
String _id;
String _lastNode;
long _expiry;
/**
* @param id session id
* @param lastNode last node id to manage the session
* @param expiry timestamp of expiry
*/
public ExpiryInfo (String id, String lastNode, long expiry)
{
_id = id;
_lastNode = lastNode;
_expiry = expiry;
}
/**
* @return the id
*/
public String getId()
{
return _id;
}
/**
* @return the lastNode
*/
public String getLastNode()
{
return _lastNode;
}
/**
* @return the expiry time
*/
public long getExpiry()
{
return _expiry;
}
}
public void setEntityDataModel(EntityDataModel model)
{
_model = model;
@ -363,7 +415,12 @@ public class GCloudSessionDataStore extends AbstractSessionDataStore
if (_model == null)
_model = new EntityDataModel();
_keyFactory = _datastore.newKeyFactory().kind(_model.getKind());
_keyFactory = _datastore.newKeyFactory().kind(_model.getKind());
_indexesPresent = checkIndexes();
if (!_indexesPresent)
LOG.warn("Session indexes not uploaded, falling back to less efficient queries");
super.doStart();
}
@ -443,43 +500,34 @@ public class GCloudSessionDataStore extends AbstractSessionDataStore
try
{
//get up to maxResult number of sessions that have expired
Query<ProjectionEntity> query = Query.projectionEntityQueryBuilder()
.kind(_model.getKind())
.projection(_model.getId(), _model.getLastNode(), _model.getExpiry())
.filter(CompositeFilter.and(PropertyFilter.gt(_model.getExpiry(), 0), PropertyFilter.le(_model.getExpiry(), now)))
.limit(_maxResults)
.build();
QueryResults<ProjectionEntity> presults = _datastore.run(query);
while (presults.hasNext())
Set<ExpiryInfo> info = null;
if (_indexesPresent)
info = queryExpiryByIndex();
else
info = queryExpiryByEntity();
for (ExpiryInfo item:info)
{
ProjectionEntity pe = presults.next();
String id = pe.getString(_model.getId());
String lastNode = pe.getString(_model.getLastNode());
long expiry = pe.getLong(_model.getExpiry());
if (StringUtil.isBlank(lastNode))
expired.add(id); //nobody managing it
if (StringUtil.isBlank(item.getLastNode()))
expired.add(item.getId()); //nobody managing it
else
{
if (_context.getWorkerName().equals(lastNode))
expired.add(id); //we're managing it, we can expire it
if (_context.getWorkerName().equals(item.getLastNode()))
expired.add(item.getId()); //we're managing it, we can expire it
else
{
if (_lastExpiryCheckTime <= 0)
{
//our first check, just look for sessions that we managed by another node that
//expired at least 3 graceperiods ago
if (expiry < (now - (1000L * (3 * _gracePeriodSec))))
expired.add(id);
if (item.getExpiry() < (now - (1000L * (3 * _gracePeriodSec))))
expired.add(item.getId());
}
else
{
//another node was last managing it, only expire it if it expired a graceperiod ago
if (expiry < (now - (1000L * _gracePeriodSec)))
expired.add(id);
if (item.getExpiry() < (now - (1000L * _gracePeriodSec)))
expired.add(item.getId());
}
}
}
@ -523,36 +571,152 @@ public class GCloudSessionDataStore extends AbstractSessionDataStore
}
/**
* A less efficient query to find sessions whose expiry time has passed:
* retrieves the whole Entity.
* @return set of ExpiryInfo representing the id, lastNode and expiry time of
* sessions that are expired
* @throws Exception
*/
protected Set<ExpiryInfo> queryExpiryByEntity () throws Exception
{
Set<ExpiryInfo> info = new HashSet<>();
//get up to maxResult number of sessions that have expired
Query<Entity> query = Query.entityQueryBuilder()
.kind(_model.getKind())
.filter(CompositeFilter.and(PropertyFilter.gt(_model.getExpiry(), 0), PropertyFilter.le(_model.getExpiry(), System.currentTimeMillis())))
.limit(_maxResults)
.build();
QueryResults<Entity> results;
if (LOG.isDebugEnabled())
{
long start = System.currentTimeMillis();
results = _datastore.run(query);
LOG.debug("Expiry query no index in {}ms", System.currentTimeMillis()-start);
}
else
results = _datastore.run(query);
while (results.hasNext())
{
Entity entity = results.next();
info.add(new ExpiryInfo(entity.getString(_model.getId()),entity.getString(_model.getLastNode()), entity.getLong(_model.getExpiry())));
}
return info;
}
/** An efficient query to find sessions whose expiry time has passed:
* uses a projection query, which requires indexes to be uploaded.
* @return id,lastnode and expiry time of sessions that have expired
* @throws Exception
*/
protected Set<ExpiryInfo> queryExpiryByIndex () throws Exception
{
Set<ExpiryInfo> info = new HashSet<>();
Query<ProjectionEntity> query = Query.projectionEntityQueryBuilder()
.kind(_model.getKind())
.projection(_model.getId(), _model.getLastNode(), _model.getExpiry())
.filter(CompositeFilter.and(PropertyFilter.gt(_model.getExpiry(), 0), PropertyFilter.le(_model.getExpiry(), System.currentTimeMillis())))
.limit(_maxResults)
.build();
QueryResults<ProjectionEntity> presults;
if (LOG.isDebugEnabled())
{
long start = System.currentTimeMillis();
presults = _datastore.run(query);
LOG.debug("Expiry query by index in {}ms", System.currentTimeMillis()-start);
}
else
presults = _datastore.run(query);
while (presults.hasNext())
{
ProjectionEntity pe = presults.next();
info.add(new ExpiryInfo(pe.getString(_model.getId()),pe.getString(_model.getLastNode()), pe.getLong(_model.getExpiry())));
}
return info;
}
/**
* @see org.eclipse.jetty.server.session.SessionDataStore#exists(java.lang.String)
*/
@Override
public boolean exists(String id) throws Exception
{
Query<ProjectionEntity> query = Query.projectionEntityQueryBuilder()
.kind(_model.getKind())
.projection(_model.getExpiry())
.filter(PropertyFilter.eq(_model.getId(), id))
.build();
QueryResults<ProjectionEntity> presults = _datastore.run(query);
if (presults.hasNext())
if (_indexesPresent)
{
ProjectionEntity pe = presults.next();
long expiry = pe.getLong(_model.getExpiry());
if (expiry <= 0)
return true; //never expires
Query<ProjectionEntity> query = Query.projectionEntityQueryBuilder()
.kind(_model.getKind())
.projection(_model.getExpiry())
.filter(PropertyFilter.eq(_model.getId(), id))
.build();
QueryResults<ProjectionEntity> presults;
if (LOG.isDebugEnabled())
{
long start = System.currentTimeMillis();
presults = _datastore.run(query);
LOG.debug("Exists query by index in {}ms", System.currentTimeMillis()-start);
}
else
return (expiry > System.currentTimeMillis()); //not expired yet
presults = _datastore.run(query);
if (presults.hasNext())
{
ProjectionEntity pe = presults.next();
return !isExpired(pe.getLong(_model.getExpiry()));
}
else
return false;
}
else
{
Query<Entity> query = Query.entityQueryBuilder()
.kind(_model.getKind())
.filter(PropertyFilter.eq(_model.getId(), id))
.build();
QueryResults<Entity> results;
if (LOG.isDebugEnabled())
{
long start = System.currentTimeMillis();
results = _datastore.run(query);
LOG.debug("Exists query no index in {}ms", System.currentTimeMillis()-start);
}
else
results = _datastore.run(query);
if (results.hasNext())
{
Entity entity = results.next();
return !isExpired(entity.getLong(_model.getExpiry()));
}
else
return false;
}
}
/**
* Check to see if the given time is in the past.
*
* @param timestamp the time to check
* @return false if the timestamp is <= 0, true if it is in the past
*/
protected boolean isExpired (long timestamp)
{
if (timestamp <= 0)
return false;
else
return timestamp < System.currentTimeMillis();
}
/**
@ -621,8 +785,31 @@ public class GCloudSessionDataStore extends AbstractSessionDataStore
String key = context.getCanonicalContextPath()+"_"+context.getVhost()+"_"+id;
return _keyFactory.newKey(key);
}
/**
* Check to see if indexes are available, in which case
* we can do more performant queries.
*/
protected boolean checkIndexes ()
{
try
{
Query<ProjectionEntity> query = Query.projectionEntityQueryBuilder()
.kind(_model.getKind())
.projection(_model.getExpiry())
.filter(PropertyFilter.eq(_model.getId(), "-"))
.build();
_datastore.run(query);
return true;
}
catch (DatastoreException e)
{
//need to assume that the problem is the index doesn't exist, because there
//is no specific code for that
return false;
}
}
/**
* Generate a gcloud datastore Entity from SessionData
* @param session the session data

View File

@ -351,7 +351,7 @@ public class SessionData implements Serializable
public boolean isExpiredAt (long time)
{
if (LOG.isDebugEnabled())
LOG.debug("Testing expiry on session {}: Never expires? {} Is expired?{}", _id, (getExpiry()<= 0), (getExpiry() < time));
LOG.debug("Testing expiry on session {}: expires at {} now {}", _id, getExpiry(), time);
if (getExpiry() <= 0)
return false; //never expires
return (getExpiry() <= time);