diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
index e20d0338b3..cf218fa1e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/BrokerBalancerConfiguration.java
@@ -26,7 +26,7 @@ public class BrokerBalancerConfiguration implements Serializable {
private TargetKey targetKey = TargetKey.SOURCE_IP;
private String targetKeyFilter = null;
private String localTargetFilter = null;
- private int cacheTimeout = -1;
+ private CacheConfiguration cacheConfiguration = null;
private PoolConfiguration poolConfiguration = null;
private NamedPropertyConfiguration policyConfiguration = null;
private NamedPropertyConfiguration transformerConfiguration = null;
@@ -67,12 +67,12 @@ public class BrokerBalancerConfiguration implements Serializable {
return this;
}
- public int getCacheTimeout() {
- return cacheTimeout;
+ public CacheConfiguration getCacheConfiguration() {
+ return cacheConfiguration;
}
- public BrokerBalancerConfiguration setCacheTimeout(int cacheTimeout) {
- this.cacheTimeout = cacheTimeout;
+ public BrokerBalancerConfiguration setCacheConfiguration(CacheConfiguration cacheConfiguration) {
+ this.cacheConfiguration = cacheConfiguration;
return this;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/CacheConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/CacheConfiguration.java
new file mode 100644
index 0000000000..f3bf22a19c
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/CacheConfiguration.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.config.balancing;
+
+import java.io.Serializable;
+
+public class CacheConfiguration implements Serializable {
+ private boolean persisted = false;
+
+ private int timeout = 0;
+
+ public CacheConfiguration() {
+ }
+
+ public boolean isPersisted() {
+ return persisted;
+ }
+
+ public CacheConfiguration setPersisted(boolean persisted) {
+ this.persisted = persisted;
+ return this;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public CacheConfiguration setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 50a5135bc6..907b1705af 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -2653,9 +2654,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
brokerBalancerConfiguration.setLocalTargetFilter(getString(e, "local-target-filter", brokerBalancerConfiguration.getLocalTargetFilter(), Validators.NO_CHECK));
- brokerBalancerConfiguration.setCacheTimeout(getInteger(e, "cache-timeout",
- brokerBalancerConfiguration.getCacheTimeout(), Validators.MINUS_ONE_OR_GE_ZERO));
-
NamedPropertyConfiguration policyConfiguration = null;
PoolConfiguration poolConfiguration = null;
NodeList children = e.getChildNodes();
@@ -2663,7 +2661,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
for (int j = 0; j < children.getLength(); j++) {
Node child = children.item(j);
- if (child.getNodeName().equals("policy")) {
+ if (child.getNodeName().equals("cache")) {
+ CacheConfiguration cacheConfiguration = new CacheConfiguration();
+ parseCacheConfiguration((Element) child, cacheConfiguration);
+ brokerBalancerConfiguration.setCacheConfiguration(cacheConfiguration);
+ } else if (child.getNodeName().equals("policy")) {
policyConfiguration = new NamedPropertyConfiguration();
parsePolicyConfiguration((Element) child, policyConfiguration);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfiguration);
@@ -2681,6 +2683,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.getBalancerConfigurations().add(brokerBalancerConfiguration);
}
+ private void parseCacheConfiguration(final Element e, final CacheConfiguration cacheConfiguration) throws ClassNotFoundException {
+ cacheConfiguration.setPersisted(getBoolean(e, "persisted",
+ cacheConfiguration.isPersisted()));
+
+ cacheConfiguration.setTimeout(getInteger(e, "timeout",
+ cacheConfiguration.getTimeout(), Validators.GE_ZERO));
+ }
+
private void parseTransformerConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
String name = e.getAttribute("name");
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 748f682685..ee747a6296 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -383,6 +384,12 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
Map getPersistedRoles();
+ void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception;
+
+ void deleteKeyValuePair(String mapId, String key) throws Exception;
+
+ Map getPersistedKeyValuePairs(String mapId);
+
/**
* @return The ID with the stored counter
*/
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java
new file mode 100644
index 0000000000..0bfb7784ae
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence.config;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.BufferHelper;
+
+public class PersistedKeyValuePair implements EncodingSupport {
+
+ private long storeId;
+
+ private String mapId;
+
+ private String key;
+
+ private String value;
+
+ public PersistedKeyValuePair() {
+ }
+
+ public PersistedKeyValuePair(String mapId, String key, String value) {
+ this.mapId = mapId;
+ this.key = key;
+ this.value = value;
+ }
+
+ public void setStoreId(long id) {
+ this.storeId = id;
+ }
+
+ public long getStoreId() {
+ return storeId;
+ }
+
+ public String getMapId() {
+ return mapId;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public int getEncodeSize() {
+ int size = 0;
+ size += BufferHelper.sizeOfString(mapId);
+ size += BufferHelper.sizeOfString(key);
+ size += BufferHelper.sizeOfString(value);
+ return size;
+ }
+
+ @Override
+ public void encode(ActiveMQBuffer buffer) {
+ buffer.writeString(mapId);
+ buffer.writeString(key);
+ buffer.writeString(value);
+ }
+
+ @Override
+ public void decode(ActiveMQBuffer buffer) {
+ mapId = buffer.readString();
+ key = buffer.readString();
+ value = buffer.readString();
+ }
+
+ @Override
+ public String toString() {
+ return "PersistedKeyValuePair [storeId=" + storeId +
+ ", mapId=" +
+ mapId +
+ ", key=" +
+ key +
+ ", value=" +
+ value +
+ "]";
+ }
+}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index b7d3101510..1c5532ea73 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -76,6 +76,7 @@ import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -220,6 +221,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected final Map mapPersistedRoles = new ConcurrentHashMap<>();
+ protected final Map> mapPersistedKeyValuePairs = new ConcurrentHashMap<>();
+
protected final ConcurrentLongHashMap largeMessagesToDelete = new ConcurrentLongHashMap<>();
public AbstractJournalStorageManager(final Configuration config,
@@ -818,6 +821,41 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return new HashMap<>(mapPersistedRoles);
}
+ @Override
+ public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
+ deleteKeyValuePair(persistedKeyValuePair.getMapId(), persistedKeyValuePair.getKey());
+ try (ArtemisCloseable lock = closeableReadLock()) {
+ final long id = idGenerator.generateID();
+ persistedKeyValuePair.setStoreId(id);
+ bindingsJournal.appendAddRecord(id, JournalRecordIds.KEY_VALUE_PAIR_RECORD, persistedKeyValuePair, true);
+ Map persistedKeyValuePairs = mapPersistedKeyValuePairs.get(persistedKeyValuePair.getMapId());
+ if (persistedKeyValuePairs == null) {
+ persistedKeyValuePairs = new HashMap<>();
+ mapPersistedKeyValuePairs.put(persistedKeyValuePair.getMapId(), persistedKeyValuePairs);
+ }
+ persistedKeyValuePairs.put(persistedKeyValuePair.getKey(), persistedKeyValuePair);
+ }
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws Exception {
+ Map persistedKeyValuePairs = mapPersistedKeyValuePairs.get(mapId);
+ if (persistedKeyValuePairs != null) {
+ PersistedKeyValuePair oldMapStringEntry = persistedKeyValuePairs.remove(key);
+ if (oldMapStringEntry != null) {
+ try (ArtemisCloseable lock = closeableReadLock()) {
+ bindingsJournal.tryAppendDeleteRecord(oldMapStringEntry.getStoreId(), this::recordNotFoundCallback, false);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Map getPersistedKeyValuePairs(String mapId) {
+ Map persistedKeyValuePairs = mapPersistedKeyValuePairs.get(mapId);
+ return persistedKeyValuePairs != null ? new HashMap<>(persistedKeyValuePairs) : new HashMap<>();
+ }
+
@Override
public void storeID(final long journalID, final long id) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
@@ -1534,6 +1572,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
} else if (rec == JournalRecordIds.ROLE_RECORD) {
PersistedRole role = newRoleEncoding(id, buffer);
mapPersistedRoles.put(role.getUsername(), role);
+ } else if (rec == JournalRecordIds.KEY_VALUE_PAIR_RECORD) {
+ PersistedKeyValuePair keyValuePair = newKeyValuePairEncoding(id, buffer);
+ Map persistedKeyValuePairs = mapPersistedKeyValuePairs.get(keyValuePair.getMapId());
+ if (persistedKeyValuePairs == null) {
+ persistedKeyValuePairs = new HashMap<>();
+ mapPersistedKeyValuePairs.put(keyValuePair.getMapId(), persistedKeyValuePairs);
+ }
+ persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair);
} else {
// unlikely to happen
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec));
@@ -2046,6 +2092,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return persistedRole;
}
+ static PersistedKeyValuePair newKeyValuePairEncoding(long id, ActiveMQBuffer buffer) {
+ PersistedKeyValuePair persistedKeyValuePair = new PersistedKeyValuePair();
+ persistedKeyValuePair.decode(buffer);
+ persistedKeyValuePair.setStoreId(id);
+ return persistedKeyValuePair;
+ }
+
/**
* @param id
* @param buffer
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 7bd371fb4e..d11c61948b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -99,4 +99,5 @@ public final class JournalRecordIds {
// Used to record the large message body on the journal when history is on
public static final byte ADD_MESSAGE_BODY = 49;
+ public static final byte KEY_VALUE_PAIR_RECORD = 50;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 61a95c8001..b1e99e92d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -488,6 +489,21 @@ public class NullStorageManager implements StorageManager {
return null;
}
+ @Override
+ public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
+
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws Exception {
+
+ }
+
+ @Override
+ public Map getPersistedKeyValuePairs(String mapId) {
+ return null;
+ }
+
@Override
public void storeSecuritySetting(final PersistedSecuritySetting persistedRoles) throws Exception {
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
index ae43e11cc1..55f8c57166 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
@@ -17,10 +17,9 @@
package org.apache.activemq.artemis.core.server.balancing;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
@@ -32,7 +31,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class BrokerBalancer implements ActiveMQComponent {
@@ -57,7 +55,7 @@ public class BrokerBalancer implements ActiveMQComponent {
private final KeyTransformer transformer;
- private final Cache cache;
+ private final Cache cache;
private volatile boolean started = false;
@@ -85,7 +83,7 @@ public class BrokerBalancer implements ActiveMQComponent {
return policy;
}
- public Cache getCache() {
+ public Cache getCache() {
return cache;
}
@@ -100,10 +98,10 @@ public class BrokerBalancer implements ActiveMQComponent {
final String targetKeyFilter,
final Target localTarget,
final String localTargetFilter,
+ final Cache cache,
final Pool pool,
final Policy policy,
- KeyTransformer transformer,
- final int cacheTimeout) {
+ KeyTransformer transformer) {
this.name = name;
this.targetKey = targetKey;
@@ -120,17 +118,15 @@ public class BrokerBalancer implements ActiveMQComponent {
this.policy = policy;
- if (cacheTimeout == -1) {
- this.cache = CacheBuilder.newBuilder().build();
- } else if (cacheTimeout > 0) {
- this.cache = CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS).build();
- } else {
- this.cache = null;
- }
+ this.cache = cache;
}
@Override
public void start() throws Exception {
+ if (cache != null) {
+ cache.start();
+ }
+
if (pool != null) {
pool.start();
}
@@ -145,6 +141,10 @@ public class BrokerBalancer implements ActiveMQComponent {
if (pool != null) {
pool.stop();
}
+
+ if (cache != null) {
+ cache.stop();
+ }
}
public TargetResult getTarget(Connection connection, String clientID, String username) {
@@ -176,20 +176,25 @@ public class BrokerBalancer implements ActiveMQComponent {
TargetResult result = null;
if (cache != null) {
- result = cache.getIfPresent(key);
- }
-
- if (result != null) {
- if (pool.isTargetReady(result.getTarget())) {
- if (logger.isDebugEnabled()) {
- logger.debug("The cache returns [" + result.getTarget() + "] ready for " + targetKey + "[" + key + "]");
- }
-
- return result;
- }
+ String nodeId = cache.get(key);
if (logger.isDebugEnabled()) {
- logger.debug("The cache returns [" + result.getTarget() + "] not ready for " + targetKey + "[" + key + "]");
+ logger.debug("The cache returns target [" + nodeId + "] for " + targetKey + "[" + key + "]");
+ }
+
+ if (nodeId != null) {
+ Target target = pool.getReadyTarget(nodeId);
+ if (target != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("The target [" + nodeId + "] is ready for " + targetKey + "[" + key + "]");
+ }
+
+ return new TargetResult(target);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("The target [" + nodeId + "] is not ready for " + targetKey + "[" + key + "]");
+ }
}
}
@@ -207,7 +212,7 @@ public class BrokerBalancer implements ActiveMQComponent {
if (logger.isDebugEnabled()) {
logger.debug("Caching " + targetKey + "[" + key + "] for [" + target + "]");
}
- cache.put(key, result);
+ cache.put(key, target.getNodeID());
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
index f9b80024b0..72b99d125f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java
@@ -21,11 +21,14 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
+import org.apache.activemq.artemis.core.server.balancing.caches.LocalCache;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactory;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
@@ -54,6 +57,8 @@ import java.util.concurrent.ScheduledExecutorService;
public final class BrokerBalancerManager implements ActiveMQComponent {
private static final Logger logger = Logger.getLogger(BrokerBalancerManager.class);
+ public static final String CACHE_ID_PREFIX = "$.BC.";
+
private final Configuration config;
@@ -91,6 +96,13 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
Target localTarget = new LocalTarget(null, server);
+
+ Cache cache = null;
+ CacheConfiguration cacheConfiguration = config.getCacheConfiguration();
+ if (cacheConfiguration != null) {
+ cache = deployCache(cacheConfiguration, config.getName());
+ }
+
Pool pool = null;
final PoolConfiguration poolConfiguration = config.getPoolConfiguration();
if (poolConfiguration != null) {
@@ -110,13 +122,20 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
}
BrokerBalancer balancer = new BrokerBalancer(config.getName(), config.getTargetKey(), config.getTargetKeyFilter(),
- localTarget, config.getLocalTargetFilter(), pool, policy, transformer, config.getCacheTimeout());
+ localTarget, config.getLocalTargetFilter(), cache, pool, policy, transformer);
balancerControllers.put(balancer.getName(), balancer);
server.getManagementService().registerBrokerBalancer(balancer);
}
+ private Cache deployCache(CacheConfiguration configuration, String name) throws ClassNotFoundException {
+ Cache cache = new LocalCache(CACHE_ID_PREFIX + name, configuration.isPersisted(),
+ configuration.getTimeout(), server.getStorageManager());
+
+ return cache;
+ }
+
private Pool deployPool(PoolConfiguration config, Target localTarget) throws Exception {
Pool pool;
TargetFactory targetFactory = new ActiveMQTargetFactory();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/Cache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/Cache.java
new file mode 100644
index 0000000000..58b1b1d345
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/Cache.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.caches;
+
+public interface Cache {
+
+ void start();
+
+ void stop();
+
+ String get(String key);
+
+ void put(String key, String nodeId);
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCache.java
new file mode 100644
index 0000000000..94113f2917
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCache.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.caches;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
+import org.jboss.logging.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class LocalCache implements Cache, RemovalListener {
+ private static final Logger logger = Logger.getLogger(LocalCache.class);
+
+ private String id;
+ private boolean persisted;
+ private int timeout;
+ private StorageManager storageManager;
+ private com.google.common.cache.Cache cache;
+ private Map persistedCacheEntries;
+
+ private volatile boolean running;
+
+ public String getId() {
+ return id;
+ }
+
+ public boolean isPersisted() {
+ return persisted;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public LocalCache(String id, boolean persisted, int timeout, StorageManager storageManager) {
+ this.id = id;
+ this.persisted = persisted;
+ this.timeout = timeout;
+ this.storageManager = storageManager;
+
+ if (timeout == 0) {
+ cache = CacheBuilder.newBuilder().build();
+ } else {
+ cache = CacheBuilder.newBuilder().removalListener(this).expireAfterAccess(timeout, TimeUnit.MILLISECONDS).build();
+ }
+ }
+
+
+ @Override
+ public void start() {
+ if (persisted) {
+ persistedCacheEntries = storageManager.getPersistedKeyValuePairs(id);
+
+ for (Map.Entry cacheEntry : persistedCacheEntries.entrySet()) {
+ cache.put(cacheEntry.getKey(), cacheEntry.getValue().getValue());
+ logger.info(cacheEntry.toString());
+ }
+ }
+
+ running = true;
+ }
+
+ @Override
+ public void stop() {
+ cache.cleanUp();
+
+ if (persistedCacheEntries != null) {
+ persistedCacheEntries.clear();
+ }
+
+ running = false;
+ }
+
+ @Override
+ public String get(String key) {
+ return cache.getIfPresent(key);
+ }
+
+ @Override
+ public void put(String key, String nodeId) {
+ if (persisted) {
+ PersistedKeyValuePair persistedKeyValuePair = persistedCacheEntries.get(key);
+
+ if (persistedKeyValuePair == null || !Objects.equals(nodeId, persistedKeyValuePair.getValue())) {
+ persistedKeyValuePair = new PersistedKeyValuePair(id, key, nodeId);
+
+ try {
+ storageManager.storeKeyValuePair(persistedKeyValuePair);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ persistedCacheEntries.put(key, persistedKeyValuePair);
+ }
+ }
+
+ cache.put(key, nodeId);
+ }
+
+ @Override
+ public void onRemoval(RemovalNotification notification) {
+ if (running && persisted) {
+ PersistedKeyValuePair persistedKeyValuePair = persistedCacheEntries.remove(notification.getKey());
+
+ if (persistedKeyValuePair != null) {
+ try {
+ storageManager.deleteKeyValuePair(persistedKeyValuePair.getMapId(), persistedKeyValuePair.getKey());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
index 5ab614505c..78606253b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/AbstractPool.java
@@ -175,6 +175,27 @@ public abstract class AbstractPool implements Pool {
return targetMonitor != null ? targetMonitor.isTargetReady() : false;
}
+ @Override
+ public Target getReadyTarget(String nodeId) {
+ int readyTargets;
+ long deadline = quorumTimeout > 0 ? System.nanoTime() + quorumTimeoutNanos : 0;
+
+ do {
+ readyTargets = 0;
+ for (TargetMonitor targetMonitor : targetMonitors) {
+ if (targetMonitor.isTargetReady()) {
+ readyTargets++;
+ if (nodeId.equals(targetMonitor.getTarget().getNodeID())) {
+ return targetMonitor.getTarget();
+ }
+ }
+ }
+ }
+ while(readyTargets < quorumSize && deadline > 0 && (System.nanoTime() - deadline) < 0);
+
+ return null;
+ }
+
@Override
public void addTargetProbe(TargetProbe probe) {
targetProbes.add(probe);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
index db6b733147..43e32149f3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/pools/Pool.java
@@ -46,6 +46,8 @@ public interface Pool extends ActiveMQComponent {
Target getTarget(String nodeId);
+ Target getReadyTarget(String nodeId);
+
boolean isTargetReady(Target target);
List getTargets();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
index 0fc1cadc64..adbd426174 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/ActiveMQTarget.java
@@ -69,6 +69,10 @@ public class ActiveMQTarget extends AbstractTarget implements FailureListener {
false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE,
BrokerBalancer.CLIENT_ID_PREFIX + UUIDGenerator.getInstance().generateStringUUID()).start());
+ if (getNodeID() == null) {
+ setNodeID(getAttribute(ResourceNames.BROKER, "NodeID", String.class, 3000));
+ }
+
connected = true;
fireConnectedEvent();
@@ -92,10 +96,6 @@ public class ActiveMQTarget extends AbstractTarget implements FailureListener {
@Override
public boolean checkReadiness() {
try {
- if (getNodeID() == null) {
- setNodeID(getAttribute(ResourceNames.BROKER, "NodeID", String.class, 3000));
- }
-
return getAttribute(ResourceNames.BROKER, "Active", Boolean.class, 3000);
} catch (Exception e) {
logger.warn("Error on check readiness", e);
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index f14317457f..8f8a60f606 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2123,7 +2123,7 @@
-
+
the time period for a cache entry to remain active
@@ -2171,6 +2171,26 @@
+
+
+
+
+
+ true means that the cache entries are persisted
+
+
+
+
+
+
+ the timeout (in milliseconds) before removing cache entries
+
+
+
+
+
+
+
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 603f0826ba..7a21cfec85 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -301,7 +301,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
} else {
Assert.assertEquals(bc.getTargetKey(), TargetKey.SOURCE_IP);
Assert.assertEquals("least-connections-balancer", bc.getName());
- Assert.assertEquals(60000, bc.getCacheTimeout());
+ Assert.assertNotNull(bc.getCacheConfiguration());
+ Assert.assertEquals(true, bc.getCacheConfiguration().isPersisted());
+ Assert.assertEquals(60000, bc.getCacheConfiguration().getTimeout());
Assert.assertEquals(bc.getPolicyConfiguration().getName(), LeastConnectionsPolicy.NAME);
Assert.assertEquals(3000, bc.getPoolConfiguration().getCheckPeriod());
Assert.assertEquals(2, bc.getPoolConfiguration().getQuorumSize());
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
index 732ea530a8..fc6acc9a2b 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java
@@ -53,7 +53,7 @@ public class BrokerBalancerTest {
Pool pool = null;
Policy policy = null;
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
- localTarget, "^FOO.*", pool, policy, null, 0);
+ localTarget, "^FOO.*", null, pool, policy, null);
assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
}
@@ -69,7 +69,7 @@ public class BrokerBalancerTest {
}
};
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
- localTarget, "^FOO.*", pool, policy, keyTransformer, 0);
+ localTarget, "^FOO.*", null, pool, policy, keyTransformer);
assertEquals( localTarget, underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCacheTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCacheTest.java
new file mode 100644
index 0000000000..5fc3f3b505
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/caches/LocalCacheTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.balancing.caches;
+
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class LocalCacheTest {
+ private static final String CACHE_NAME = "TEST";
+ private static final int CACHE_TIMEOUT = 500;
+ private static final String CACHE_ENTRY_KEY = "TEST_KEY";
+ private static final String CACHE_ENTRY_VALUE = "TEST_VALUE";
+
+ @Test
+ public void testValidEntry() {
+ LocalCache cache = new LocalCache(CACHE_NAME, false, 0, null);
+
+ cache.start();
+
+ try {
+ cache.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+ Assert.assertEquals(CACHE_ENTRY_VALUE, cache.get(CACHE_ENTRY_KEY));
+ } finally {
+ cache.stop();
+ }
+ }
+
+ @Test
+ public void testExpiration() throws Exception {
+ LocalCache cache = new LocalCache(CACHE_NAME, false, CACHE_TIMEOUT, null);
+
+ cache.start();
+
+ try {
+ cache.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+ Assert.assertEquals(CACHE_ENTRY_VALUE, cache.get(CACHE_ENTRY_KEY));
+ Wait.assertTrue(() -> cache.get(CACHE_ENTRY_KEY) == null, CACHE_TIMEOUT * 2, CACHE_TIMEOUT);
+ } finally {
+ cache.stop();
+ }
+ }
+
+ @Test
+ public void testPersistedEntry() {
+ StorageManager storageManager = new DummyKeyValuePairStorageManager();
+
+ LocalCache cacheBeforeStop = new LocalCache(CACHE_NAME, true, 0, storageManager);
+
+ cacheBeforeStop.start();
+
+ try {
+ cacheBeforeStop.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+ Assert.assertEquals(CACHE_ENTRY_VALUE, cacheBeforeStop.get(CACHE_ENTRY_KEY));
+ } finally {
+ cacheBeforeStop.stop();
+ }
+
+ Assert.assertEquals(CACHE_ENTRY_VALUE, storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
+
+ LocalCache cacheAfterStop = new LocalCache(CACHE_NAME, true, 0, storageManager);
+
+ cacheAfterStop.start();
+
+ try {
+ Assert.assertEquals(CACHE_ENTRY_VALUE, cacheAfterStop.get(CACHE_ENTRY_KEY));
+ } finally {
+ cacheAfterStop.stop();
+ }
+
+ Assert.assertEquals(CACHE_ENTRY_VALUE, storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
+ }
+
+ @Test
+ public void testPersistedExpiration() throws Exception {
+ StorageManager storageManager = new DummyKeyValuePairStorageManager();
+
+ LocalCache cacheBeforeStop = new LocalCache(CACHE_NAME, true, CACHE_TIMEOUT, storageManager);
+
+ cacheBeforeStop.start();
+
+ try {
+ cacheBeforeStop.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
+ Assert.assertEquals(CACHE_ENTRY_VALUE, cacheBeforeStop.get(CACHE_ENTRY_KEY));
+ } finally {
+ cacheBeforeStop.stop();
+ }
+
+ Assert.assertEquals(CACHE_ENTRY_VALUE, storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
+
+ LocalCache cacheAfterStop = new LocalCache(CACHE_NAME, true, CACHE_TIMEOUT, storageManager);
+
+ cacheAfterStop.start();
+
+ try {
+ Assert.assertEquals(CACHE_ENTRY_VALUE, cacheAfterStop.get(CACHE_ENTRY_KEY));
+ Thread.sleep(CACHE_TIMEOUT * 2);
+ Assert.assertNull(cacheAfterStop.get(CACHE_ENTRY_KEY));
+ } finally {
+ cacheAfterStop.stop();
+ }
+
+ Assert.assertNull(storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY));
+ }
+
+ static class DummyKeyValuePairStorageManager extends NullStorageManager {
+ private Map> mapPersistedKeyValuePairs = new ConcurrentHashMap<>();
+
+ @Override
+ public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
+ Map persistedKeyValuePairs = mapPersistedKeyValuePairs.get(persistedKeyValuePair.getMapId());
+ if (persistedKeyValuePairs == null) {
+ persistedKeyValuePairs = new HashMap<>();
+ mapPersistedKeyValuePairs.put(persistedKeyValuePair.getMapId(), persistedKeyValuePairs);
+ }
+ persistedKeyValuePairs.put(persistedKeyValuePair.getKey(), persistedKeyValuePair);
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws Exception {
+ Map persistedKeyValuePairs = mapPersistedKeyValuePairs.get(mapId);
+ if (persistedKeyValuePairs != null) {
+ persistedKeyValuePairs.remove(key);
+ }
+ }
+
+ @Override
+ public Map getPersistedKeyValuePairs(String mapId) {
+ Map persistedKeyValuePairs = mapPersistedKeyValuePairs.get(mapId);
+ return persistedKeyValuePairs != null ? new HashMap<>(persistedKeyValuePairs) : new HashMap<>();
+ }
+ }
+}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 77ba7ee81c..4ceee87087 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -666,6 +667,21 @@ public class TransactionImplTest extends ActiveMQTestBase {
return null;
}
+ @Override
+ public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
+
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws Exception {
+
+ }
+
+ @Override
+ public Map getPersistedKeyValuePairs(String mapId) {
+ return null;
+ }
+
@Override
public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
return 0;
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 6764e250bd..c331dfaa9d 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -188,7 +188,10 @@
- 60000
+
+ true
+ 60000
+
3000
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index aee5fb2456..70cdde0081 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -179,7 +179,10 @@
- 60000
+
+ true
+ 60000
+
3000
diff --git a/docs/user-manual/en/broker-balancers.md b/docs/user-manual/en/broker-balancers.md
index c821877500..4fbc0a698b 100644
--- a/docs/user-manual/en/broker-balancers.md
+++ b/docs/user-manual/en/broker-balancers.md
@@ -103,8 +103,19 @@ A policy is defined by the `policy` element. Let's take a look at a policy examp
The broker balancer provides a cache with a timeout to improve the stickiness of the target broker selected,
returning the same target broker for a target key as long as it is present in the cache and is ready.
So a broker balancer with the cache enabled doesn't strictly follow the configured policy.
-By default, the cache is enabled and will never timeout. See below
-for more details about setting the `cache-timeout` parameter.
+By default, the cache is not enabled.
+
+A cache is defined by the `cache` element that includes the following items:
+* the `persisted` element defines whether the cache has to persist entries, default is `false`;
+* the `timeout` element defines the timeout before removing entries, measured in milliseconds, setting 0 will disable the timeout, default is `0`.
+
+Let's take a look at a cache example from broker.xml:
+```xml
+
+ true
+ 60000
+
+```
## Key transformers
A `local-target-key-transformer` allows target key transformation before matching against any local-target-filter. One use case is
@@ -119,9 +130,8 @@ A broker balancer is defined by the `broker-balancer` element, it includes the f
* the `target-key-filter` element defines a regular expression to filter the resolved keys;
* the `local-target-filter` element defines a regular expression to match the keys that have to return a local target;
* the `local-target-key-transformer` element defines a key transformer, see [key transformers](#key-transformers);
-* the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration;
-* the `pool` element defines the pool to group the target brokers, see [pools](#pools).
-* the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies);
+* the `pool` element defines the pool to group the target brokers, see [pools](#pools);
+* the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies).
Let's take a look at some broker balancer examples from broker.xml:
```xml
diff --git a/docs/user-manual/en/images/broker_balancer_workflow.png b/docs/user-manual/en/images/broker_balancer_workflow.png
index 97560b6bd8..457c5e112b 100644
Binary files a/docs/user-manual/en/images/broker_balancer_workflow.png and b/docs/user-manual/en/images/broker_balancer_workflow.png differ
diff --git a/docs/user-manual/en/images/management_api_redirect_sequence.png b/docs/user-manual/en/images/management_api_redirect_sequence.png
index 371baa8923..4f25326786 100644
Binary files a/docs/user-manual/en/images/management_api_redirect_sequence.png and b/docs/user-manual/en/images/management_api_redirect_sequence.png differ
diff --git a/docs/user-manual/en/images/native_redirect_sequence.png b/docs/user-manual/en/images/native_redirect_sequence.png
index d7466da344..e5b45bd114 100644
Binary files a/docs/user-manual/en/images/native_redirect_sequence.png and b/docs/user-manual/en/images/native_redirect_sequence.png differ
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
index c626325269..3a82854841 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
+import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@@ -151,6 +152,16 @@ public class BalancingTestBase extends ClusterTestBase {
}
+ protected void setupBalancerLocalCache(final int node, boolean persisted, int timeout) {
+
+ Configuration configuration = getServer(node).getConfiguration();
+ BrokerBalancerConfiguration brokerBalancerConfiguration = configuration.getBalancerConfigurations().stream()
+ .filter(brokerBalancerConfiguration1 -> brokerBalancerConfiguration1.getName().equals(BROKER_BALANCER_NAME)).findFirst().get();
+
+ brokerBalancerConfiguration.setCacheConfiguration(
+ new CacheConfiguration().setPersisted(persisted).setTimeout(timeout));
+ }
+
protected ConnectionFactory createFactory(String protocol, boolean sslEnabled, String host, int port, String clientID, String user, String password) throws Exception {
return createFactory(protocol, sslEnabled, host, port, clientID, user, password, -1);
}
@@ -215,7 +226,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
- urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries + "&failover.randomize=true");
+ urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries);
if (clientID != null) {
urlBuilder.append("&jms.clientID=");
@@ -243,7 +254,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
- urlBuilder.append("?startupMaxReconnectAttempts=" + retries + "&maxReconnectAttempts=" + retries);
+ urlBuilder.append("?randomize=false&startupMaxReconnectAttempts=" + retries + "&maxReconnectAttempts=" + retries);
if (clientID != null) {
urlBuilder.append("&jms.clientID=");
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
index 46dd417816..3b7b786623 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
@@ -140,15 +140,27 @@ public class RedirectTest extends BalancingTestBase {
@Test
public void testRoundRobinRedirect() throws Exception {
- testEvenlyRedirect(RoundRobinPolicy.NAME, null);
+ testEvenlyRedirect(RoundRobinPolicy.NAME, null, false);
}
@Test
public void testLeastConnectionsRedirect() throws Exception {
- testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD, String.valueOf(30)));
+ testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(
+ LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD, String.valueOf(30)), false);
}
- private void testEvenlyRedirect(final String policyName, final Map properties) throws Exception {
+ @Test
+ public void testRoundRobinRedirectWithFailure() throws Exception {
+ testEvenlyRedirect(RoundRobinPolicy.NAME, null, true);
+ }
+
+ @Test
+ public void testLeastConnectionsRedirectWithFailure() throws Exception {
+ testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(
+ LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD, String.valueOf(30)), true);
+ }
+
+ private void testEvenlyRedirect(final String policyName, final Map properties, final boolean withFailure) throws Exception {
final String queueName = "RedirectTestQueue";
final int targets = MULTIPLE_TARGETS;
int[] nodes = new int[targets + 1];
@@ -174,6 +186,10 @@ public class RedirectTest extends BalancingTestBase {
setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, policyName, properties, false, null, targets, 1, 2, 3);
}
+ if (withFailure) {
+ setupBalancerLocalCache(0, true, 0);
+ }
+
startServers(nodes);
for (int node : nodes) {
@@ -214,6 +230,12 @@ public class RedirectTest extends BalancingTestBase {
Assert.assertEquals("Messages of node " + targetNode, 1, queueControls[targetNode].countMessages());
}
+ if (withFailure) {
+ crashAndWaitForFailure(getServer(0));
+
+ startServers(0);
+ }
+
for (int i = 0; i < targets; i++) {
try (Connection connection = connectionFactories[i].createConnection()) {
connection.start();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 01b1c22a13..13d176e07f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -62,6 +62,7 @@ import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
+import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@@ -756,6 +757,21 @@ public class SendAckFailTest extends SpawnedTestBase {
return manager.getPersistedRoles();
}
+ @Override
+ public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
+ manager.storeKeyValuePair(persistedKeyValuePair);
+ }
+
+ @Override
+ public void deleteKeyValuePair(String mapId, String key) throws Exception {
+ manager.deleteKeyValuePair(mapId, key);
+ }
+
+ @Override
+ public Map getPersistedKeyValuePairs(String mapId) {
+ return manager.getPersistedKeyValuePairs(mapId);
+ }
+
@Override
public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
return manager.storePageCounter(txID, queueID, value, size);