Fixed InfinispanSessionData serialization issues when using infinispan (#4600)

* Fixed InfinispanSessionData serialization issues when using infinispan
embedded
* Added version to InfinispanSessionDataSerializer
* Refactored InfinispanSessionDataSerializer to make use of
SessionDataMarshaller
* Tests with and without storeAsBinary enabled
* Merged InfinispanSessionDataSerializer and SessionDataMarshaller
* Wrapped Object Input/Output to Input/Output Streams
* Fixed an issue when reading the session data from a file.
* Modified tests to force read from file if available.
* static lazy init serializationContext
* synchronized initSerializationContext

Signed-off-by: Andrej Krota <andrej.krota@gmail.com>
This commit is contained in:
Andrej Krota 2020-03-23 11:14:01 +01:00 committed by GitHub
parent 34488b71b6
commit d445d7eb5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 563 additions and 37 deletions

View File

@ -0,0 +1,145 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.session.infinispan;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
/**
* BoundDelegatingInputStream
*
* An InputStream that delegates methods to an ObjectInput. The ObjectInput must start
* with an integer containing the length of the data.
*/
public class BoundDelegatingInputStream extends InputStream
{
protected final ObjectInput objectInput;
private final int length;
private int position = 0;
public BoundDelegatingInputStream(ObjectInput objectInput) throws IOException
{
this.objectInput = objectInput;
this.length = objectInput.readInt();
}
@Override
public int read() throws IOException
{
if (position < length)
{
position++;
return objectInput.read();
}
return -1;
}
@Override
public int read(byte[] b) throws IOException
{
int available = length - position;
int read = -1;
if (position == length)
{
return read;
}
if (b.length > available)
{
read = objectInput.read(b, 0, available);
}
else
{
read = objectInput.read(b);
}
if (read != -1)
{
position += read;
}
return read;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
int read = -1;
if (position == length)
{
return read;
}
if (position + len > length)
{
read = objectInput.read(b, off, length - position);
}
else
{
read = objectInput.read(b, off, len);
}
if (read != -1)
{
position += read;
}
return read;
}
@Override
public long skip(long n) throws IOException
{
long skip = 0;
if (position + n < length)
{
skip = objectInput.skip(length - position);
}
else
{
skip = objectInput.skip(n);
}
if (skip > 0)
{
position += skip;
}
return skip;
}
@Override
public int available() throws IOException
{
if (position < length)
{
int available = objectInput.available();
if (position + available > length)
{
return length - position;
}
else
{
return available;
}
}
return 0;
}
@Override
public void close() throws IOException
{
objectInput.close();
}
}

View File

@ -1,3 +1,21 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.session.infinispan;
import java.io.ByteArrayInputStream;
@ -8,6 +26,7 @@ import java.util.Map;
import org.eclipse.jetty.server.session.SessionData;
import org.eclipse.jetty.util.ClassLoadingObjectInputStream;
import org.infinispan.commons.marshall.SerializeWith;
/**
* InfinispanSessionData
@ -19,6 +38,7 @@ import org.eclipse.jetty.util.ClassLoadingObjectInputStream;
* pool and thus these threads have no knowledge of the correct classloader to
* use.
*/
@SerializeWith(SessionDataMarshaller.class)
public class InfinispanSessionData extends SessionData
{
protected byte[] _serializedAttributes;

View File

@ -19,8 +19,14 @@
package org.eclipse.jetty.session.infinispan;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.infinispan.commons.marshall.Externalizer;
import org.infinispan.protostream.FileDescriptorSource;
import org.infinispan.protostream.MessageMarshaller;
import org.infinispan.protostream.ProtobufUtil;
import org.infinispan.protostream.SerializationContext;
/**
* SessionDataMarshaller
@ -30,13 +36,30 @@ import org.infinispan.protostream.MessageMarshaller;
* control to ensure that session attributes can be deserialized using either
* the container class loader or the webapp classloader, as appropriate.
*/
public class SessionDataMarshaller implements MessageMarshaller<InfinispanSessionData>
public class SessionDataMarshaller
implements MessageMarshaller<InfinispanSessionData>, Externalizer<InfinispanSessionData>
{
/**
* The version of the serializer.
*/
private static final int VERSION = 0;
private static SerializationContext serializationContext;
private static synchronized void initSerializationContext() throws IOException
{
if (serializationContext != null)
{
return;
}
FileDescriptorSource fds = new FileDescriptorSource();
fds.addProtoFiles("/session.proto");
SerializationContext sCtx = ProtobufUtil.newSerializationContext();
sCtx.registerProtoFiles(fds);
sCtx.registerMarshaller(new SessionDataMarshaller());
serializationContext = sCtx;
}
@Override
public Class<? extends InfinispanSessionData> getJavaClass()
{
@ -49,6 +72,39 @@ public class SessionDataMarshaller implements MessageMarshaller<InfinispanSessio
return "org_eclipse_jetty_session_infinispan.InfinispanSessionData";
}
@Override
public InfinispanSessionData readObject(ObjectInput input) throws IOException, ClassNotFoundException
{
if (serializationContext == null)
{
initSerializationContext();
}
// invokes readFrom(ProtoStreamReader)
InfinispanSessionData data = ProtobufUtil.readFrom(serializationContext, new BoundDelegatingInputStream(input),
InfinispanSessionData.class);
if (data != null)
{
data.deserializeAttributes();
}
return data;
}
@Override
public void writeObject(ObjectOutput output, InfinispanSessionData object) throws IOException
{
if (serializationContext == null)
{
initSerializationContext();
}
// invokes writeTo(ProtoStreamWriter, InfinispanSessionData)
byte[] data = ProtobufUtil.toByteArray(serializationContext, object);
int length = data.length;
output.writeInt(length);
output.write(data);
}
@Override
public InfinispanSessionData readFrom(ProtoStreamReader in) throws IOException
{
@ -67,7 +123,8 @@ public class SessionDataMarshaller implements MessageMarshaller<InfinispanSessio
long expiry = in.readLong("expiry");
long maxInactiveMs = in.readLong("maxInactiveMs");
InfinispanSessionData sd = new InfinispanSessionData(id, cpath, vhost, created, accessed, lastAccessed, maxInactiveMs);
InfinispanSessionData sd = new InfinispanSessionData(id, cpath, vhost, created, accessed, lastAccessed,
maxInactiveMs);
sd.setCookieSet(cookieSet);
sd.setLastNode(lastNode);
sd.setExpiry(expiry);
@ -103,4 +160,5 @@ public class SessionDataMarshaller implements MessageMarshaller<InfinispanSessio
sdata.serializeAttributes();
out.writeBytes("attributes", sdata.getSerializedAttributes());
}
}

View File

@ -0,0 +1,67 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.session;
import org.eclipse.jetty.session.infinispan.InfinispanSessionDataStoreFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* ClusteredSerializedSessionScavengingTest
*/
public class ClusteredSerializedSessionScavengingTest extends AbstractClusteredSessionScavengingTest
{
public static InfinispanTestSupport __testSupport;
@BeforeAll
public static void setup() throws Exception
{
__testSupport = new InfinispanTestSupport();
__testSupport.setUseFileStore(true);
__testSupport.setSerializeSessionData(true);
__testSupport.setup();
}
@AfterAll
public static void teardown() throws Exception
{
if (__testSupport != null)
__testSupport.teardown();
}
@Override
@Test
public void testClusteredScavenge()
throws Exception
{
super.testClusteredScavenge();
}
/**
* @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory()
*/
@Override
public SessionDataStoreFactory createSessionDataStoreFactory()
{
InfinispanSessionDataStoreFactory factory = new InfinispanSessionDataStoreFactory();
factory.setCache(__testSupport.getCache());
return factory;
}
}

View File

@ -0,0 +1,37 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.session;
import org.junit.jupiter.api.BeforeEach;
/**
* HotInitInfinispanSessionDataStoreTest
*/
public class InfinispanFileSessionDataStoreTest extends InfinispanSessionDataStoreTest
{
@BeforeEach
public void setup() throws Exception
{
_testSupport = new InfinispanTestSupport();
_testSupport.setUseFileStore(true);
_testSupport.setup();
}
}

View File

@ -18,13 +18,10 @@
package org.eclipse.jetty.server.session;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.session.infinispan.InfinispanSessionData;
import org.eclipse.jetty.session.infinispan.InfinispanSessionDataStore;
import org.eclipse.jetty.session.infinispan.InfinispanSessionDataStoreFactory;
import org.infinispan.Cache;
import org.infinispan.query.Search;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
@ -32,8 +29,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
/**
@ -164,30 +160,26 @@ public class InfinispanSessionDataStoreTest extends AbstractSessionDataStoreTest
@Test
public void testQuery() throws Exception
{
Cache<String, SessionData> cache = _testSupport.getCache();
InfinispanSessionData sd1 = new InfinispanSessionData("sd1", "", "", 0, 0, 0, 1000);
sd1.setLastNode("fred1");
_testSupport.getCache().put("session1", sd1);
SessionData sd1 = new SessionData("sd1", "", "", 0, 0, 0, 0);
SessionData sd2 = new SessionData("sd2", "", "", 0, 0, 0, 1000);
sd2.setExpiry(100L); //long ago
SessionData sd3 = new SessionData("sd3", "", "", 0, 0, 0, 0);
InfinispanSessionData sd2 = new InfinispanSessionData("sd2", "", "", 0, 0, 0, 2000);
sd2.setLastNode("fred2");
_testSupport.getCache().put("session2", sd2);
cache.put("session1", sd1);
cache.put("session2", sd2);
cache.put("session3", sd3);
InfinispanSessionData sd3 = new InfinispanSessionData("sd3", "", "", 0, 0, 0, 3000);
sd3.setLastNode("fred3");
_testSupport.getCache().put("session3", sd3);
QueryFactory qf = Search.getQueryFactory(cache);
Query q = qf.from(SessionData.class).select("id").having("expiry").lte(System.currentTimeMillis()).and().having("expiry").gt(0).toBuilder().build();
QueryFactory qf = Search.getQueryFactory(_testSupport.getCache());
List<Object[]> list = q.list();
List<String> ids = new ArrayList<>();
for (Object[] sl : list)
for (int i = 0; i <= 3; i++)
{
ids.add((String)sl[0]);
}
assertFalse(ids.isEmpty());
assertTrue(1 == ids.size());
assertTrue(ids.contains("sd2"));
long now = System.currentTimeMillis();
Query q = qf.from(InfinispanSessionData.class).having("expiry").lt(now).build();
assertEquals(i, q.list().size());
Thread.sleep(1000);
}
}
}

View File

@ -27,8 +27,8 @@ import org.eclipse.jetty.util.IO;
import org.hibernate.search.cfg.Environment;
import org.hibernate.search.cfg.SearchMapping;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.ConfigurationChildBuilder;
import org.infinispan.configuration.cache.Index;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
@ -48,6 +48,7 @@ public class InfinispanTestSupport
public ConfigurationBuilder _builder;
private File _tmpdir;
private boolean _useFileStore;
private boolean _serializeSessionData;
private String _name;
public static EmbeddedCacheManager _manager;
@ -82,6 +83,11 @@ public class InfinispanTestSupport
_useFileStore = useFileStore;
}
public void setSerializeSessionData(boolean serializeSessionData)
{
_serializeSessionData = serializeSessionData;
}
public Cache getCache()
{
return _cache;
@ -106,25 +112,33 @@ public class InfinispanTestSupport
_tmpdir.delete();
_tmpdir.mkdir();
Configuration config = _builder.indexing()
ConfigurationChildBuilder b = _builder.indexing()
.index(Index.ALL)
.addIndexedEntity(SessionData.class)
.withProperties(properties)
.persistence()
.addSingleFileStore()
.location(_tmpdir.getAbsolutePath())
.storeAsBinary()
.build();
.location(_tmpdir.getAbsolutePath());
if (_serializeSessionData)
{
b = b.storeAsBinary().enable();
}
_manager.defineConfiguration(_name, config);
_manager.defineConfiguration(_name, b.build());
}
else
{
_manager.defineConfiguration(_name, _builder.indexing()
ConfigurationChildBuilder b = _builder.indexing()
.withProperties(properties)
.index(Index.ALL)
.addIndexedEntity(SessionData.class)
.build());
.addIndexedEntity(SessionData.class);
if (_serializeSessionData)
{
b = b.storeAsBinary().enable();
}
_manager.defineConfiguration(_name, b.build());
}
_cache = _manager.getCache(_name);
}
@ -163,6 +177,13 @@ public class InfinispanTestSupport
public boolean checkSessionPersisted(SessionData data)
throws Exception
{
//evicts the object from memory. Forces the cache to fetch the data from file
if (_useFileStore)
{
_cache.evict(data.getContextPath() + "_" + data.getVhost() + "_" + data.getId());
}
Object obj = _cache.get(data.getContextPath() + "_" + data.getVhost() + "_" + data.getId());
if (obj == null)
return false;

View File

@ -0,0 +1,186 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.session;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.session.infinispan.InfinispanSessionData;
import org.eclipse.jetty.session.infinispan.InfinispanSessionDataStore;
import org.eclipse.jetty.session.infinispan.InfinispanSessionDataStoreFactory;
import org.infinispan.query.Search;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
/**
* SerializedInfinispanSessionDataStoreTest
*/
public class SerializedInfinispanSessionDataStoreTest extends AbstractSessionDataStoreTest
{
public InfinispanTestSupport _testSupport;
@BeforeEach
public void setup() throws Exception
{
_testSupport = new InfinispanTestSupport();
_testSupport.setSerializeSessionData(true);
_testSupport.setup();
}
@AfterEach
public void teardown() throws Exception
{
_testSupport.teardown();
}
@Override
public SessionDataStoreFactory createSessionDataStoreFactory()
{
InfinispanSessionDataStoreFactory factory = new InfinispanSessionDataStoreFactory();
factory.setCache(_testSupport.getCache());
return factory;
}
@Override
public void persistSession(SessionData data) throws Exception
{
_testSupport.createSession(data);
}
@Override
public void persistUnreadableSession(SessionData data) throws Exception
{
//Not used by testLoadSessionFails()
}
@Override
public boolean checkSessionExists(SessionData data) throws Exception
{
return _testSupport.checkSessionExists(data);
}
/**
* This test deliberately sets the infinispan cache to null to
* try and provoke an exception in the InfinispanSessionDataStore.load() method.
*/
@Override
public void testLoadSessionFails() throws Exception
{
//create the SessionDataStore
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/test");
SessionDataStoreFactory factory = createSessionDataStoreFactory();
((AbstractSessionDataStoreFactory)factory).setGracePeriodSec(GRACE_PERIOD_SEC);
SessionDataStore store = factory.getSessionDataStore(context.getSessionHandler());
SessionContext sessionContext = new SessionContext("foo", context.getServletContext());
store.initialize(sessionContext);
//persist a session
long now = System.currentTimeMillis();
SessionData data = store.newSessionData("222", 100, now, now - 1, -1);
data.setLastNode(sessionContext.getWorkerName());
persistSession(data);
store.start();
((InfinispanSessionDataStore)store).setCache(null);
//test that loading it fails
try
{
store.load("222");
fail("Session should be unreadable");
}
catch (UnreadableSessionDataException e)
{
//expected exception
}
}
/**
* This test currently won't work for Infinispan - there is currently no
* means to query it to find sessions that have expired.
*
* @see org.eclipse.jetty.server.session.AbstractSessionDataStoreTest#testGetExpiredPersistedAndExpiredOnly()
*/
@Override
public void testGetExpiredPersistedAndExpiredOnly() throws Exception
{
}
/**
* This test won't work for Infinispan - there is currently no
* means to query infinispan to find other expired sessions.
*/
@Override
public void testGetExpiredDifferentNode() throws Exception
{
//Ignore
}
/**
*
*/
@Override
public boolean checkSessionPersisted(SessionData data) throws Exception
{
ClassLoader old = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(_contextClassLoader);
try
{
return _testSupport.checkSessionPersisted(data);
}
finally
{
Thread.currentThread().setContextClassLoader(old);
}
}
@Test
public void testQuery() throws Exception
{
InfinispanSessionData sd1 = new InfinispanSessionData("sd1", "", "", 0, 0, 0, 1000);
sd1.setLastNode("fred1");
_testSupport.getCache().put("session1", sd1);
InfinispanSessionData sd2 = new InfinispanSessionData("sd2", "", "", 0, 0, 0, 2000);
sd2.setLastNode("fred2");
_testSupport.getCache().put("session2", sd2);
InfinispanSessionData sd3 = new InfinispanSessionData("sd3", "", "", 0, 0, 0, 3000);
sd3.setLastNode("fred3");
_testSupport.getCache().put("session3", sd3);
QueryFactory qf = Search.getQueryFactory(_testSupport.getCache());
for (int i = 0; i <= 3; i++)
{
long now = System.currentTimeMillis();
Query q = qf.from(InfinispanSessionData.class).having("expiry").lt(now).build();
assertEquals(i, q.list().size());
Thread.sleep(1000);
}
}
}