HHH-10057 hibernate-infinispan incompatible with Infinispan 8.0.0.CR1

* ISPN-5609 changed InvalidateCommand constructors: used reflection to work around that; now should work with 8.0.0.CR1
* renamed BeginInvalidationCommand.getLockOwner to getSessionTransactionId() to prevent further conflicts
* added commands tests

(cherry picked from commit 64137c3619)
This commit is contained in:
Radim Vansa 2015-08-26 14:16:11 +02:00 committed by Steve Ebersole
parent e048b36e21
commit 92e328be0d
6 changed files with 203 additions and 37 deletions

View File

@ -125,14 +125,14 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements
// increment invalidations counter if statistics maintained // increment invalidations counter if statistics maintained
incrementInvalidations(); incrementInvalidations();
InvalidateCommand invalidateCommand; InvalidateCommand invalidateCommand;
Object lockOwner = putFromLoadValidator.registerRemoteInvalidations(keys); Object sessionTransactionId = putFromLoadValidator.registerRemoteInvalidations(keys);
if (!isLocalModeForced(command)) { if (!isLocalModeForced(command)) {
if (lockOwner == null) { if (sessionTransactionId == null) {
invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys); invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys);
} }
else { else {
invalidateCommand = commandInitializer.buildBeginInvalidationCommand( invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
InfinispanCollections.<Flag>emptySet(), keys, lockOwner); InfinispanCollections.<Flag>emptySet(), keys, sessionTransactionId);
} }
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand); log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);

View File

@ -45,16 +45,16 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable { public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) { if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) {
for (Object key : command.getKeys()) { for (Object key : command.getKeys()) {
putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key); putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getSessionTransactionId(), key);
} }
} }
return invokeNextInterceptor(ctx, command); return invokeNextInterceptor(ctx, command);
} }
public void broadcastEndInvalidationCommand(Object[] keys, Object lockOwner) { public void broadcastEndInvalidationCommand(Object[] keys, Object sessionTransactionId) {
assert lockOwner != null; assert sessionTransactionId != null;
EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand( EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand(
cacheName, keys, lockOwner); cacheName, keys, sessionTransactionId);
rpcManager.invokeRemotely(null, endInvalidationCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE)); rpcManager.invokeRemotely(null, endInvalidationCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE));
} }
} }

View File

@ -7,10 +7,15 @@
package org.hibernate.cache.infinispan.util; package org.hibernate.cache.infinispan.util;
import org.hibernate.internal.util.compare.EqualsHelper; import org.hibernate.internal.util.compare.EqualsHelper;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.notifications.cachelistener.CacheNotifier; import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.remoting.transport.Address;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays; import java.util.Arrays;
import java.util.Set; import java.util.Set;
@ -18,31 +23,85 @@ import java.util.Set;
* @author Radim Vansa &lt;rvansa@redhat.com&gt; * @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/ */
public class BeginInvalidationCommand extends InvalidateCommand { public class BeginInvalidationCommand extends InvalidateCommand {
private Object lockOwner; // this is a hack to keep compatibility with both Infinispan 7 and 8
// TODO: remove this when rebasing on Infinispan 8
private static final Field commandInvocationIdField;
private static final Method generateIdMethod;
static {
Field commandInvocationId = null;
Method generateId = null;
try {
commandInvocationId = AbstractDataWriteCommand.class.getDeclaredField("commandInvocationId");
commandInvocationId.setAccessible(true);
Class commandInvocationIdClass = Class.forName("org.infinispan.commands.CommandInvocationId");
generateId = commandInvocationIdClass.getMethod("generateId", Address.class);
}
catch (NoSuchFieldException e) {
}
catch (ClassNotFoundException e) {
// already found field and not the class?
throw new IllegalStateException(e);
}
catch (NoSuchMethodException e) {
// already found field and not the method?
throw new IllegalStateException(e);
}
commandInvocationIdField = commandInvocationId;
generateIdMethod = generateId;
}
private Object sessionTransactionId;
public BeginInvalidationCommand() { public BeginInvalidationCommand() {
} }
public BeginInvalidationCommand(CacheNotifier notifier, Set<Flag> flags, Object[] keys, Object lockOwner) { public BeginInvalidationCommand(CacheNotifier notifier, Set<Flag> flags, Object[] keys, Address address, Object sessionTransactionId) {
super(notifier, flags, keys); super();
this.lockOwner = lockOwner; this.notifier = notifier;
this.flags = flags;
this.keys = keys;
this.sessionTransactionId = sessionTransactionId;
if (commandInvocationIdField != null) {
try {
commandInvocationIdField.set(this, generateIdMethod.invoke(null, address));
}
catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}
catch (InvocationTargetException e) {
throw new IllegalStateException(e);
}
}
} }
public Object getLockOwner() { public Object getSessionTransactionId() {
return lockOwner; return sessionTransactionId;
} }
@Override @Override
public Object[] getParameters() { public Object[] getParameters() {
Object commandInvocationId = null;
if (commandInvocationIdField != null) {
try {
commandInvocationId = commandInvocationIdField.get(this);
}
catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
if (keys == null || keys.length == 0) { if (keys == null || keys.length == 0) {
return new Object[]{0, lockOwner}; return new Object[]{flags, sessionTransactionId, commandInvocationId, 0};
} }
if (keys.length == 1) { if (keys.length == 1) {
return new Object[]{1, keys[0], lockOwner}; return new Object[]{flags, sessionTransactionId, commandInvocationId, 1, keys[0]};
} }
Object[] retval = new Object[keys.length + 2]; Object[] retval = new Object[keys.length + 4];
retval[0] = keys.length; retval[0] = flags;
System.arraycopy(keys, 0, retval, 1, keys.length); retval[1] = sessionTransactionId;
retval[2] = commandInvocationId;
retval[3] = keys.length;
System.arraycopy(keys, 0, retval, 4, keys.length);
return retval; return retval;
} }
@ -51,15 +110,25 @@ public class BeginInvalidationCommand extends InvalidateCommand {
if (commandId != CacheCommandIds.BEGIN_INVALIDATION) { if (commandId != CacheCommandIds.BEGIN_INVALIDATION) {
throw new IllegalStateException("Invalid method id"); throw new IllegalStateException("Invalid method id");
} }
int size = (Integer) args[0]; flags = (Set<Flag>) args[0];
sessionTransactionId = args[1];
Object commandInvocationId = args[2];
if (commandInvocationIdField != null) {
try {
commandInvocationIdField.set(this, commandInvocationId);
}
catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
int size = (Integer) args[3];
keys = new Object[size]; keys = new Object[size];
if (size == 1) { if (size == 1) {
keys[0] = args[1]; keys[0] = args[4];
} }
else if (size > 0) { else if (size > 0) {
System.arraycopy(args, 1, keys, 0, size); System.arraycopy(args, 4, keys, 0, size);
} }
lockOwner = args[args.length - 1];
} }
@ -75,7 +144,7 @@ public class BeginInvalidationCommand extends InvalidateCommand {
} }
if (o instanceof BeginInvalidationCommand) { if (o instanceof BeginInvalidationCommand) {
BeginInvalidationCommand bic = (BeginInvalidationCommand) o; BeginInvalidationCommand bic = (BeginInvalidationCommand) o;
return EqualsHelper.equals(lockOwner, bic.lockOwner); return EqualsHelper.equals(sessionTransactionId, bic.sessionTransactionId);
} }
else { else {
return false; return false;
@ -84,12 +153,12 @@ public class BeginInvalidationCommand extends InvalidateCommand {
@Override @Override
public int hashCode() { public int hashCode() {
return super.hashCode() + (lockOwner == null ? 0 : lockOwner.hashCode()); return super.hashCode() + (sessionTransactionId == null ? 0 : sessionTransactionId.hashCode());
} }
@Override @Override
public String toString() { public String toString() {
return "BeginInvalidateCommand{keys=" + Arrays.toString(keys) + return "BeginInvalidateCommand{keys=" + Arrays.toString(keys) +
", lockOwner=" + lockOwner + '}'; ", sessionTransactionId=" + sessionTransactionId + '}';
} }
} }

View File

@ -12,6 +12,7 @@ import org.infinispan.commands.module.ModuleCommandInitializer;
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.notifications.cachelistener.CacheNotifier; import org.infinispan.notifications.cachelistener.CacheNotifier;
import java.util.Set; import java.util.Set;
@ -29,11 +30,13 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
= new ConcurrentHashMap<String, PutFromLoadValidator>(); = new ConcurrentHashMap<String, PutFromLoadValidator>();
private CacheNotifier notifier; private CacheNotifier notifier;
private Configuration configuration; private Configuration configuration;
private ClusteringDependentLogic clusteringDependentLogic;
@Inject @Inject
public void injectDependencies(CacheNotifier notifier, Configuration configuration) { public void injectDependencies(CacheNotifier notifier, Configuration configuration, ClusteringDependentLogic clusteringDependentLogic) {
this.notifier = notifier; this.notifier = notifier;
this.configuration = configuration; this.configuration = configuration;
this.clusteringDependentLogic = clusteringDependentLogic;
} }
public void addPutFromLoadValidator(String cacheName, PutFromLoadValidator putFromLoadValidator) { public void addPutFromLoadValidator(String cacheName, PutFromLoadValidator putFromLoadValidator) {
@ -62,12 +65,12 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
return new EvictAllCommand( regionName ); return new EvictAllCommand( regionName );
} }
public BeginInvalidationCommand buildBeginInvalidationCommand(Set<Flag> flags, Object[] keys, Object lockOwner) { public BeginInvalidationCommand buildBeginInvalidationCommand(Set<Flag> flags, Object[] keys, Object sessionTransactionId) {
return new BeginInvalidationCommand(notifier, flags, keys, lockOwner); return new BeginInvalidationCommand(notifier, flags, keys, clusteringDependentLogic.getAddress(), sessionTransactionId);
} }
public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) { public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object sessionTransactionId) {
return new EndInvalidationCommand( cacheName, keys, lockOwner ); return new EndInvalidationCommand( cacheName, keys, sessionTransactionId );
} }
@Override @Override

View File

@ -20,7 +20,7 @@ import java.util.Arrays;
*/ */
public class EndInvalidationCommand extends BaseRpcCommand { public class EndInvalidationCommand extends BaseRpcCommand {
private Object[] keys; private Object[] keys;
private Object lockOwner; private Object sessionTransactionId;
private PutFromLoadValidator putFromLoadValidator; private PutFromLoadValidator putFromLoadValidator;
public EndInvalidationCommand(String cacheName) { public EndInvalidationCommand(String cacheName) {
@ -30,16 +30,16 @@ public class EndInvalidationCommand extends BaseRpcCommand {
/** /**
* @param cacheName name of the cache to evict * @param cacheName name of the cache to evict
*/ */
public EndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) { public EndInvalidationCommand(String cacheName, Object[] keys, Object sessionTransactionId) {
super(cacheName); super(cacheName);
this.keys = keys; this.keys = keys;
this.lockOwner = lockOwner; this.sessionTransactionId = sessionTransactionId;
} }
@Override @Override
public Object perform(InvocationContext ctx) throws Throwable { public Object perform(InvocationContext ctx) throws Throwable {
for (Object key : keys) { for (Object key : keys) {
putFromLoadValidator.endInvalidatingKey(lockOwner, key); putFromLoadValidator.endInvalidatingKey(sessionTransactionId, key);
} }
return null; return null;
} }
@ -51,13 +51,13 @@ public class EndInvalidationCommand extends BaseRpcCommand {
@Override @Override
public Object[] getParameters() { public Object[] getParameters() {
return new Object[] { keys, lockOwner }; return new Object[] { keys, sessionTransactionId};
} }
@Override @Override
public void setParameters(int commandId, Object[] parameters) { public void setParameters(int commandId, Object[] parameters) {
keys = (Object[]) parameters[0]; keys = (Object[]) parameters[0];
lockOwner = parameters[1]; sessionTransactionId = parameters[1];
} }
@Override @Override
@ -74,12 +74,41 @@ public class EndInvalidationCommand extends BaseRpcCommand {
this.putFromLoadValidator = putFromLoadValidator; this.putFromLoadValidator = putFromLoadValidator;
} }
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof EndInvalidationCommand)) {
return false;
}
EndInvalidationCommand that = (EndInvalidationCommand) o;
if (cacheName == null ? cacheName != null : !cacheName.equals(that.cacheName)) {
return false;
}
if (!Arrays.equals(keys, that.keys)) {
return false;
}
return !(sessionTransactionId != null ? !sessionTransactionId.equals(that.sessionTransactionId) : that.sessionTransactionId != null);
}
@Override
public int hashCode() {
int result = cacheName != null ? cacheName.hashCode() : 0;
result = 31 * result + (keys != null ? Arrays.hashCode(keys) : 0);
result = 31 * result + (sessionTransactionId != null ? sessionTransactionId.hashCode() : 0);
return result;
}
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder("EndInvalidationCommand{"); final StringBuilder sb = new StringBuilder("EndInvalidationCommand{");
sb.append("cacheName=").append(cacheName); sb.append("cacheName=").append(cacheName);
sb.append(", keys=").append(Arrays.toString(keys)); sb.append(", keys=").append(Arrays.toString(keys));
sb.append(", lockOwner=").append(lockOwner); sb.append(", sessionTransactionId=").append(sessionTransactionId);
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }

View File

@ -0,0 +1,65 @@
package org.hibernate.test.cache.infinispan.util;
import org.hibernate.cache.infinispan.util.BeginInvalidationCommand;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.distribution.TestAddress;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Collections;
import java.util.UUID;
import java.util.function.Supplier;
import static org.jgroups.util.Util.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class CacheCommandsInitializerTest {
private static CacheCommandInitializer initializer = new CacheCommandInitializer();
@BeforeClass
public static void setUp() {
ClusteringDependentLogic cdl = mock(ClusteringDependentLogic.class);
when(cdl.getAddress()).thenReturn(new TestAddress(0));
initializer.injectDependencies(null, null, cdl);
}
@Test
public void testBeginInvalidationCommand1() {
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{}, UUID.randomUUID());
checkParameters(command, () -> new BeginInvalidationCommand());
}
@Test
public void testBeginInvalidationCommand2() {
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{ 1 }, UUID.randomUUID());
checkParameters(command, () -> new BeginInvalidationCommand());
}
@Test
public void testBeginInvalidationCommand3() {
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{ 2, 3 }, UUID.randomUUID());
checkParameters(command, () -> new BeginInvalidationCommand());
}
@Test
public void testEndInvalidationCommmand() {
EndInvalidationCommand command = initializer.buildEndInvalidationCommand("foo", new Object[] { 2, 3 }, UUID.randomUUID());
checkParameters(command, () -> new EndInvalidationCommand("foo"));
}
protected <T extends ReplicableCommand> void checkParameters(T command, Supplier<T> commandSupplier) {
Object[] parameters = command.getParameters();
ReplicableCommand newCommand = commandSupplier.get();
newCommand.setParameters(command.getCommandId(), parameters);
assertEquals(command, newCommand);
assertArrayEquals(parameters, newCommand.getParameters());
}
}