HHH-11372 Do not send RemoveExpiredCommands in repl/dist caches

This commit is contained in:
Radim Vansa 2017-01-06 18:12:10 +01:00 committed by Galder Zamarreño
parent c6b6d9d242
commit 1ba6e00e00
1 changed files with 25 additions and 0 deletions

View File

@ -32,6 +32,9 @@ import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.CacheEntry;
import org.infinispan.expiration.ExpirationManager;
import org.infinispan.expiration.impl.ClusterExpirationManager;
import org.infinispan.expiration.impl.ExpirationManagerImpl;
import org.infinispan.filter.KeyValueFilter; import org.infinispan.filter.KeyValueFilter;
import org.infinispan.interceptors.CallInterceptor; import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.interceptors.EntryWrappingInterceptor; import org.infinispan.interceptors.EntryWrappingInterceptor;
@ -156,6 +159,7 @@ public abstract class BaseTransactionalDataRegion
} }
replaceCommonInterceptors(); replaceCommonInterceptors();
replaceExpirationManager();
cache.removeInterceptor(CallInterceptor.class); cache.removeInterceptor(CallInterceptor.class);
VersionedCallInterceptor tombstoneCallInterceptor = new VersionedCallInterceptor(this, metadata.getVersionComparator()); VersionedCallInterceptor tombstoneCallInterceptor = new VersionedCallInterceptor(this, metadata.getVersionComparator());
@ -177,6 +181,7 @@ public abstract class BaseTransactionalDataRegion
} }
replaceCommonInterceptors(); replaceCommonInterceptors();
replaceExpirationManager();
cache.removeInterceptor(CallInterceptor.class); cache.removeInterceptor(CallInterceptor.class);
TombstoneCallInterceptor tombstoneCallInterceptor = new TombstoneCallInterceptor(this); TombstoneCallInterceptor tombstoneCallInterceptor = new TombstoneCallInterceptor(this);
@ -218,6 +223,26 @@ public abstract class BaseTransactionalDataRegion
} }
} }
private void replaceExpirationManager() {
// ClusteredExpirationManager sends RemoteExpirationCommands to remote nodes which causes
// undesired overhead. When get() triggers a RemoteExpirationCommand executed in async executor
// this locks the entry for the duration of RPC, and putFromLoad with ZERO_LOCK_ACQUISITION_TIMEOUT
// fails as it finds the entry being blocked.
ExpirationManager expirationManager = cache.getComponentRegistry().getComponent(ExpirationManager.class);
if ((expirationManager instanceof ClusterExpirationManager)) {
// re-registering component does not stop the old one
((ClusterExpirationManager) expirationManager).stop();
cache.getComponentRegistry().registerComponent(new ExpirationManagerImpl<>(), ExpirationManager.class);
cache.getComponentRegistry().rewire();
}
else if (expirationManager instanceof ExpirationManagerImpl) {
// do nothing
}
else {
throw new IllegalStateException("Expected clustered expiration manager, found " + expirationManager);
}
}
public long getTombstoneExpiration() { public long getTombstoneExpiration() {
return tombstoneExpiration; return tombstoneExpiration;
} }