HHH-10057 hibernate-infinispan incompatible with Infinispan 8.0.0.CR1
* ISPN-5609 changed InvalidateCommand constructors: used reflection to work around that; now should work with 8.0.0.CR1 * renamed BeginInvalidationCommand.getLockOwner to getSessionTransactionId() to prevent further conflicts * added commands tests
This commit is contained in:
parent
1d62197b9d
commit
64137c3619
|
@ -125,14 +125,14 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements
|
|||
// increment invalidations counter if statistics maintained
|
||||
incrementInvalidations();
|
||||
InvalidateCommand invalidateCommand;
|
||||
Object lockOwner = putFromLoadValidator.registerRemoteInvalidations(keys);
|
||||
Object sessionTransactionId = putFromLoadValidator.registerRemoteInvalidations(keys);
|
||||
if (!isLocalModeForced(command)) {
|
||||
if (lockOwner == null) {
|
||||
if (sessionTransactionId == null) {
|
||||
invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys);
|
||||
}
|
||||
else {
|
||||
invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
|
||||
InfinispanCollections.<Flag>emptySet(), keys, lockOwner);
|
||||
InfinispanCollections.<Flag>emptySet(), keys, sessionTransactionId);
|
||||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
|
||||
|
|
|
@ -45,16 +45,16 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
|
|||
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
|
||||
if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) {
|
||||
for (Object key : command.getKeys()) {
|
||||
putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key);
|
||||
putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getSessionTransactionId(), key);
|
||||
}
|
||||
}
|
||||
return invokeNextInterceptor(ctx, command);
|
||||
}
|
||||
|
||||
public void broadcastEndInvalidationCommand(Object[] keys, Object lockOwner) {
|
||||
assert lockOwner != null;
|
||||
public void broadcastEndInvalidationCommand(Object[] keys, Object sessionTransactionId) {
|
||||
assert sessionTransactionId != null;
|
||||
EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand(
|
||||
cacheName, keys, lockOwner);
|
||||
cacheName, keys, sessionTransactionId);
|
||||
rpcManager.invokeRemotely(null, endInvalidationCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,10 +7,15 @@
|
|||
package org.hibernate.cache.infinispan.util;
|
||||
|
||||
import org.hibernate.internal.util.compare.EqualsHelper;
|
||||
import org.infinispan.commands.write.AbstractDataWriteCommand;
|
||||
import org.infinispan.commands.write.InvalidateCommand;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.infinispan.notifications.cachelistener.CacheNotifier;
|
||||
import org.infinispan.remoting.transport.Address;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -18,31 +23,85 @@ import java.util.Set;
|
|||
* @author Radim Vansa <rvansa@redhat.com>
|
||||
*/
|
||||
public class BeginInvalidationCommand extends InvalidateCommand {
|
||||
private Object lockOwner;
|
||||
// this is a hack to keep compatibility with both Infinispan 7 and 8
|
||||
// TODO: remove this when rebasing on Infinispan 8
|
||||
private static final Field commandInvocationIdField;
|
||||
private static final Method generateIdMethod;
|
||||
|
||||
static {
|
||||
Field commandInvocationId = null;
|
||||
Method generateId = null;
|
||||
try {
|
||||
commandInvocationId = AbstractDataWriteCommand.class.getDeclaredField("commandInvocationId");
|
||||
commandInvocationId.setAccessible(true);
|
||||
Class commandInvocationIdClass = Class.forName("org.infinispan.commands.CommandInvocationId");
|
||||
generateId = commandInvocationIdClass.getMethod("generateId", Address.class);
|
||||
}
|
||||
catch (NoSuchFieldException e) {
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
// already found field and not the class?
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
catch (NoSuchMethodException e) {
|
||||
// already found field and not the method?
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
commandInvocationIdField = commandInvocationId;
|
||||
generateIdMethod = generateId;
|
||||
}
|
||||
|
||||
private Object sessionTransactionId;
|
||||
|
||||
public BeginInvalidationCommand() {
|
||||
}
|
||||
|
||||
public BeginInvalidationCommand(CacheNotifier notifier, Set<Flag> flags, Object[] keys, Object lockOwner) {
|
||||
super(notifier, flags, keys);
|
||||
this.lockOwner = lockOwner;
|
||||
public BeginInvalidationCommand(CacheNotifier notifier, Set<Flag> flags, Object[] keys, Address address, Object sessionTransactionId) {
|
||||
super();
|
||||
this.notifier = notifier;
|
||||
this.flags = flags;
|
||||
this.keys = keys;
|
||||
this.sessionTransactionId = sessionTransactionId;
|
||||
if (commandInvocationIdField != null) {
|
||||
try {
|
||||
commandInvocationIdField.set(this, generateIdMethod.invoke(null, address));
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
catch (InvocationTargetException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Object getLockOwner() {
|
||||
return lockOwner;
|
||||
public Object getSessionTransactionId() {
|
||||
return sessionTransactionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] getParameters() {
|
||||
Object commandInvocationId = null;
|
||||
if (commandInvocationIdField != null) {
|
||||
try {
|
||||
commandInvocationId = commandInvocationIdField.get(this);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
if (keys == null || keys.length == 0) {
|
||||
return new Object[]{0, lockOwner};
|
||||
return new Object[]{flags, sessionTransactionId, commandInvocationId, 0};
|
||||
}
|
||||
if (keys.length == 1) {
|
||||
return new Object[]{1, keys[0], lockOwner};
|
||||
return new Object[]{flags, sessionTransactionId, commandInvocationId, 1, keys[0]};
|
||||
}
|
||||
Object[] retval = new Object[keys.length + 2];
|
||||
retval[0] = keys.length;
|
||||
System.arraycopy(keys, 0, retval, 1, keys.length);
|
||||
Object[] retval = new Object[keys.length + 4];
|
||||
retval[0] = flags;
|
||||
retval[1] = sessionTransactionId;
|
||||
retval[2] = commandInvocationId;
|
||||
retval[3] = keys.length;
|
||||
System.arraycopy(keys, 0, retval, 4, keys.length);
|
||||
return retval;
|
||||
}
|
||||
|
||||
|
@ -51,15 +110,25 @@ public class BeginInvalidationCommand extends InvalidateCommand {
|
|||
if (commandId != CacheCommandIds.BEGIN_INVALIDATION) {
|
||||
throw new IllegalStateException("Invalid method id");
|
||||
}
|
||||
int size = (Integer) args[0];
|
||||
flags = (Set<Flag>) args[0];
|
||||
sessionTransactionId = args[1];
|
||||
Object commandInvocationId = args[2];
|
||||
if (commandInvocationIdField != null) {
|
||||
try {
|
||||
commandInvocationIdField.set(this, commandInvocationId);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
int size = (Integer) args[3];
|
||||
keys = new Object[size];
|
||||
if (size == 1) {
|
||||
keys[0] = args[1];
|
||||
keys[0] = args[4];
|
||||
}
|
||||
else if (size > 0) {
|
||||
System.arraycopy(args, 1, keys, 0, size);
|
||||
System.arraycopy(args, 4, keys, 0, size);
|
||||
}
|
||||
lockOwner = args[args.length - 1];
|
||||
}
|
||||
|
||||
|
||||
|
@ -75,7 +144,7 @@ public class BeginInvalidationCommand extends InvalidateCommand {
|
|||
}
|
||||
if (o instanceof BeginInvalidationCommand) {
|
||||
BeginInvalidationCommand bic = (BeginInvalidationCommand) o;
|
||||
return EqualsHelper.equals(lockOwner, bic.lockOwner);
|
||||
return EqualsHelper.equals(sessionTransactionId, bic.sessionTransactionId);
|
||||
}
|
||||
else {
|
||||
return false;
|
||||
|
@ -84,12 +153,12 @@ public class BeginInvalidationCommand extends InvalidateCommand {
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return super.hashCode() + (lockOwner == null ? 0 : lockOwner.hashCode());
|
||||
return super.hashCode() + (sessionTransactionId == null ? 0 : sessionTransactionId.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BeginInvalidateCommand{keys=" + Arrays.toString(keys) +
|
||||
", lockOwner=" + lockOwner + '}';
|
||||
", sessionTransactionId=" + sessionTransactionId + '}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.infinispan.commands.module.ModuleCommandInitializer;
|
|||
import org.infinispan.configuration.cache.Configuration;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.infinispan.factories.annotations.Inject;
|
||||
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
|
||||
import org.infinispan.notifications.cachelistener.CacheNotifier;
|
||||
|
||||
import java.util.Set;
|
||||
|
@ -29,11 +30,13 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
|
|||
= new ConcurrentHashMap<String, PutFromLoadValidator>();
|
||||
private CacheNotifier notifier;
|
||||
private Configuration configuration;
|
||||
private ClusteringDependentLogic clusteringDependentLogic;
|
||||
|
||||
@Inject
|
||||
public void injectDependencies(CacheNotifier notifier, Configuration configuration) {
|
||||
public void injectDependencies(CacheNotifier notifier, Configuration configuration, ClusteringDependentLogic clusteringDependentLogic) {
|
||||
this.notifier = notifier;
|
||||
this.configuration = configuration;
|
||||
this.clusteringDependentLogic = clusteringDependentLogic;
|
||||
}
|
||||
|
||||
public void addPutFromLoadValidator(String cacheName, PutFromLoadValidator putFromLoadValidator) {
|
||||
|
@ -62,12 +65,12 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
|
|||
return new EvictAllCommand( regionName );
|
||||
}
|
||||
|
||||
public BeginInvalidationCommand buildBeginInvalidationCommand(Set<Flag> flags, Object[] keys, Object lockOwner) {
|
||||
return new BeginInvalidationCommand(notifier, flags, keys, lockOwner);
|
||||
public BeginInvalidationCommand buildBeginInvalidationCommand(Set<Flag> flags, Object[] keys, Object sessionTransactionId) {
|
||||
return new BeginInvalidationCommand(notifier, flags, keys, clusteringDependentLogic.getAddress(), sessionTransactionId);
|
||||
}
|
||||
|
||||
public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) {
|
||||
return new EndInvalidationCommand( cacheName, keys, lockOwner );
|
||||
public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object sessionTransactionId) {
|
||||
return new EndInvalidationCommand( cacheName, keys, sessionTransactionId );
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,7 +20,7 @@ import java.util.Arrays;
|
|||
*/
|
||||
public class EndInvalidationCommand extends BaseRpcCommand {
|
||||
private Object[] keys;
|
||||
private Object lockOwner;
|
||||
private Object sessionTransactionId;
|
||||
private PutFromLoadValidator putFromLoadValidator;
|
||||
|
||||
public EndInvalidationCommand(String cacheName) {
|
||||
|
@ -30,16 +30,16 @@ public class EndInvalidationCommand extends BaseRpcCommand {
|
|||
/**
|
||||
* @param cacheName name of the cache to evict
|
||||
*/
|
||||
public EndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) {
|
||||
public EndInvalidationCommand(String cacheName, Object[] keys, Object sessionTransactionId) {
|
||||
super(cacheName);
|
||||
this.keys = keys;
|
||||
this.lockOwner = lockOwner;
|
||||
this.sessionTransactionId = sessionTransactionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object perform(InvocationContext ctx) throws Throwable {
|
||||
for (Object key : keys) {
|
||||
putFromLoadValidator.endInvalidatingKey(lockOwner, key);
|
||||
putFromLoadValidator.endInvalidatingKey(sessionTransactionId, key);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -51,13 +51,13 @@ public class EndInvalidationCommand extends BaseRpcCommand {
|
|||
|
||||
@Override
|
||||
public Object[] getParameters() {
|
||||
return new Object[] { keys, lockOwner };
|
||||
return new Object[] { keys, sessionTransactionId};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setParameters(int commandId, Object[] parameters) {
|
||||
keys = (Object[]) parameters[0];
|
||||
lockOwner = parameters[1];
|
||||
sessionTransactionId = parameters[1];
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -74,12 +74,41 @@ public class EndInvalidationCommand extends BaseRpcCommand {
|
|||
this.putFromLoadValidator = putFromLoadValidator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof EndInvalidationCommand)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
EndInvalidationCommand that = (EndInvalidationCommand) o;
|
||||
|
||||
if (cacheName == null ? cacheName != null : !cacheName.equals(that.cacheName)) {
|
||||
return false;
|
||||
}
|
||||
if (!Arrays.equals(keys, that.keys)) {
|
||||
return false;
|
||||
}
|
||||
return !(sessionTransactionId != null ? !sessionTransactionId.equals(that.sessionTransactionId) : that.sessionTransactionId != null);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = cacheName != null ? cacheName.hashCode() : 0;
|
||||
result = 31 * result + (keys != null ? Arrays.hashCode(keys) : 0);
|
||||
result = 31 * result + (sessionTransactionId != null ? sessionTransactionId.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder("EndInvalidationCommand{");
|
||||
sb.append("cacheName=").append(cacheName);
|
||||
sb.append(", keys=").append(Arrays.toString(keys));
|
||||
sb.append(", lockOwner=").append(lockOwner);
|
||||
sb.append(", sessionTransactionId=").append(sessionTransactionId);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
package org.hibernate.test.cache.infinispan.util;
|
||||
|
||||
import org.hibernate.cache.infinispan.util.BeginInvalidationCommand;
|
||||
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
|
||||
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
|
||||
import org.infinispan.commands.ReplicableCommand;
|
||||
import org.infinispan.distribution.TestAddress;
|
||||
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.jgroups.util.Util.assertEquals;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* @author Radim Vansa <rvansa@redhat.com>
|
||||
*/
|
||||
public class CacheCommandsInitializerTest {
|
||||
private static CacheCommandInitializer initializer = new CacheCommandInitializer();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
ClusteringDependentLogic cdl = mock(ClusteringDependentLogic.class);
|
||||
when(cdl.getAddress()).thenReturn(new TestAddress(0));
|
||||
initializer.injectDependencies(null, null, cdl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBeginInvalidationCommand1() {
|
||||
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{}, UUID.randomUUID());
|
||||
checkParameters(command, () -> new BeginInvalidationCommand());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBeginInvalidationCommand2() {
|
||||
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{ 1 }, UUID.randomUUID());
|
||||
checkParameters(command, () -> new BeginInvalidationCommand());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBeginInvalidationCommand3() {
|
||||
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{ 2, 3 }, UUID.randomUUID());
|
||||
checkParameters(command, () -> new BeginInvalidationCommand());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndInvalidationCommmand() {
|
||||
EndInvalidationCommand command = initializer.buildEndInvalidationCommand("foo", new Object[] { 2, 3 }, UUID.randomUUID());
|
||||
checkParameters(command, () -> new EndInvalidationCommand("foo"));
|
||||
}
|
||||
|
||||
protected <T extends ReplicableCommand> void checkParameters(T command, Supplier<T> commandSupplier) {
|
||||
Object[] parameters = command.getParameters();
|
||||
ReplicableCommand newCommand = commandSupplier.get();
|
||||
newCommand.setParameters(command.getCommandId(), parameters);
|
||||
assertEquals(command, newCommand);
|
||||
assertArrayEquals(parameters, newCommand.getParameters());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue