ARTEMIS-3645 Support broker balancer cache persistence

This commit is contained in:
Domenico Francesco Bruscino 2022-01-14 15:07:53 +01:00 committed by Justin Bertram
parent ae7e7cbb22
commit 290e5016c8
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
29 changed files with 756 additions and 57 deletions

View File

@ -26,7 +26,7 @@ public class BrokerBalancerConfiguration implements Serializable {
private TargetKey targetKey = TargetKey.SOURCE_IP;
private String targetKeyFilter = null;
private String localTargetFilter = null;
private int cacheTimeout = -1;
private CacheConfiguration cacheConfiguration = null;
private PoolConfiguration poolConfiguration = null;
private NamedPropertyConfiguration policyConfiguration = null;
private NamedPropertyConfiguration transformerConfiguration = null;
@ -67,12 +67,12 @@ public class BrokerBalancerConfiguration implements Serializable {
return this;
}
public int getCacheTimeout() {
return cacheTimeout;
public CacheConfiguration getCacheConfiguration() {
return cacheConfiguration;
}
public BrokerBalancerConfiguration setCacheTimeout(int cacheTimeout) {
this.cacheTimeout = cacheTimeout;
public BrokerBalancerConfiguration setCacheConfiguration(CacheConfiguration cacheConfiguration) {
this.cacheConfiguration = cacheConfiguration;
return this;
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.balancing;
import java.io.Serializable;
public class CacheConfiguration implements Serializable {
private boolean persisted = false;
private int timeout = 0;
public CacheConfiguration() {
}
public boolean isPersisted() {
return persisted;
}
public CacheConfiguration setPersisted(boolean persisted) {
this.persisted = persisted;
return this;
}
public int getTimeout() {
return timeout;
}
public CacheConfiguration setTimeout(int timeout) {
this.timeout = timeout;
return this;
}
}

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@ -2653,9 +2654,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
brokerBalancerConfiguration.setLocalTargetFilter(getString(e, "local-target-filter", brokerBalancerConfiguration.getLocalTargetFilter(), Validators.NO_CHECK));
brokerBalancerConfiguration.setCacheTimeout(getInteger(e, "cache-timeout",
brokerBalancerConfiguration.getCacheTimeout(), Validators.MINUS_ONE_OR_GE_ZERO));
NamedPropertyConfiguration policyConfiguration = null;
PoolConfiguration poolConfiguration = null;
NodeList children = e.getChildNodes();
@ -2663,7 +2661,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
for (int j = 0; j < children.getLength(); j++) {
Node child = children.item(j);
if (child.getNodeName().equals("policy")) {
if (child.getNodeName().equals("cache")) {
CacheConfiguration cacheConfiguration = new CacheConfiguration();
parseCacheConfiguration((Element) child, cacheConfiguration);
brokerBalancerConfiguration.setCacheConfiguration(cacheConfiguration);
} else if (child.getNodeName().equals("policy")) {
policyConfiguration = new NamedPropertyConfiguration();
parsePolicyConfiguration((Element) child, policyConfiguration);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfiguration);
@ -2681,6 +2683,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.getBalancerConfigurations().add(brokerBalancerConfiguration);
}
private void parseCacheConfiguration(final Element e, final CacheConfiguration cacheConfiguration) throws ClassNotFoundException {
cacheConfiguration.setPersisted(getBoolean(e, "persisted",
cacheConfiguration.isPersisted()));
cacheConfiguration.setTimeout(getInteger(e, "timeout",
cacheConfiguration.getTimeout(), Validators.GE_ZERO));
}
private void parseTransformerConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
String name = e.getAttribute("name");

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@ -383,6 +384,12 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
Map<String, PersistedRole> getPersistedRoles();
void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception;
void deleteKeyValuePair(String mapId, String key) throws Exception;
Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId);
/**
* @return The ID with the stored counter
*/

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.config;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.BufferHelper;
public class PersistedKeyValuePair implements EncodingSupport {
private long storeId;
private String mapId;
private String key;
private String value;
public PersistedKeyValuePair() {
}
public PersistedKeyValuePair(String mapId, String key, String value) {
this.mapId = mapId;
this.key = key;
this.value = value;
}
public void setStoreId(long id) {
this.storeId = id;
}
public long getStoreId() {
return storeId;
}
public String getMapId() {
return mapId;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
@Override
public int getEncodeSize() {
int size = 0;
size += BufferHelper.sizeOfString(mapId);
size += BufferHelper.sizeOfString(key);
size += BufferHelper.sizeOfString(value);
return size;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeString(mapId);
buffer.writeString(key);
buffer.writeString(value);
}
@Override
public void decode(ActiveMQBuffer buffer) {
mapId = buffer.readString();
key = buffer.readString();
value = buffer.readString();
}
@Override
public String toString() {
return "PersistedKeyValuePair [storeId=" + storeId +
", mapId=" +
mapId +
", key=" +
key +
", value=" +
value +
"]";
}
}

View File

@ -76,6 +76,7 @@ import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@ -220,6 +221,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected final Map<String, PersistedRole> mapPersistedRoles = new ConcurrentHashMap<>();
protected final Map<String, Map<String, PersistedKeyValuePair>> mapPersistedKeyValuePairs = new ConcurrentHashMap<>();
protected final ConcurrentLongHashMap<LargeServerMessage> largeMessagesToDelete = new ConcurrentLongHashMap<>();
public AbstractJournalStorageManager(final Configuration config,
@ -818,6 +821,41 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return new HashMap<>(mapPersistedRoles);
}
@Override
public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
deleteKeyValuePair(persistedKeyValuePair.getMapId(), persistedKeyValuePair.getKey());
try (ArtemisCloseable lock = closeableReadLock()) {
final long id = idGenerator.generateID();
persistedKeyValuePair.setStoreId(id);
bindingsJournal.appendAddRecord(id, JournalRecordIds.KEY_VALUE_PAIR_RECORD, persistedKeyValuePair, true);
Map<String, PersistedKeyValuePair> persistedKeyValuePairs = mapPersistedKeyValuePairs.get(persistedKeyValuePair.getMapId());
if (persistedKeyValuePairs == null) {
persistedKeyValuePairs = new HashMap<>();
mapPersistedKeyValuePairs.put(persistedKeyValuePair.getMapId(), persistedKeyValuePairs);
}
persistedKeyValuePairs.put(persistedKeyValuePair.getKey(), persistedKeyValuePair);
}
}
@Override
public void deleteKeyValuePair(String mapId, String key) throws Exception {
Map<String, PersistedKeyValuePair> persistedKeyValuePairs = mapPersistedKeyValuePairs.get(mapId);
if (persistedKeyValuePairs != null) {
PersistedKeyValuePair oldMapStringEntry = persistedKeyValuePairs.remove(key);
if (oldMapStringEntry != null) {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.tryAppendDeleteRecord(oldMapStringEntry.getStoreId(), this::recordNotFoundCallback, false);
}
}
}
}
@Override
public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId) {
Map<String, PersistedKeyValuePair> persistedKeyValuePairs = mapPersistedKeyValuePairs.get(mapId);
return persistedKeyValuePairs != null ? new HashMap<>(persistedKeyValuePairs) : new HashMap<>();
}
@Override
public void storeID(final long journalID, final long id) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
@ -1534,6 +1572,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
} else if (rec == JournalRecordIds.ROLE_RECORD) {
PersistedRole role = newRoleEncoding(id, buffer);
mapPersistedRoles.put(role.getUsername(), role);
} else if (rec == JournalRecordIds.KEY_VALUE_PAIR_RECORD) {
PersistedKeyValuePair keyValuePair = newKeyValuePairEncoding(id, buffer);
Map<String, PersistedKeyValuePair> persistedKeyValuePairs = mapPersistedKeyValuePairs.get(keyValuePair.getMapId());
if (persistedKeyValuePairs == null) {
persistedKeyValuePairs = new HashMap<>();
mapPersistedKeyValuePairs.put(keyValuePair.getMapId(), persistedKeyValuePairs);
}
persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair);
} else {
// unlikely to happen
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec));
@ -2046,6 +2092,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return persistedRole;
}
static PersistedKeyValuePair newKeyValuePairEncoding(long id, ActiveMQBuffer buffer) {
PersistedKeyValuePair persistedKeyValuePair = new PersistedKeyValuePair();
persistedKeyValuePair.decode(buffer);
persistedKeyValuePair.setStoreId(id);
return persistedKeyValuePair;
}
/**
* @param id
* @param buffer

View File

@ -99,4 +99,5 @@ public final class JournalRecordIds {
// Used to record the large message body on the journal when history is on
public static final byte ADD_MESSAGE_BODY = 49;
public static final byte KEY_VALUE_PAIR_RECORD = 50;
}

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@ -488,6 +489,21 @@ public class NullStorageManager implements StorageManager {
return null;
}
@Override
public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
}
@Override
public void deleteKeyValuePair(String mapId, String key) throws Exception {
}
@Override
public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId) {
return null;
}
@Override
public void storeSecuritySetting(final PersistedSecuritySetting persistedRoles) throws Exception {
}

View File

@ -17,10 +17,9 @@
package org.apache.activemq.artemis.core.server.balancing;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
@ -32,7 +31,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class BrokerBalancer implements ActiveMQComponent {
@ -57,7 +55,7 @@ public class BrokerBalancer implements ActiveMQComponent {
private final KeyTransformer transformer;
private final Cache<String, TargetResult> cache;
private final Cache cache;
private volatile boolean started = false;
@ -85,7 +83,7 @@ public class BrokerBalancer implements ActiveMQComponent {
return policy;
}
public Cache<String, TargetResult> getCache() {
public Cache getCache() {
return cache;
}
@ -100,10 +98,10 @@ public class BrokerBalancer implements ActiveMQComponent {
final String targetKeyFilter,
final Target localTarget,
final String localTargetFilter,
final Cache cache,
final Pool pool,
final Policy policy,
KeyTransformer transformer,
final int cacheTimeout) {
KeyTransformer transformer) {
this.name = name;
this.targetKey = targetKey;
@ -120,17 +118,15 @@ public class BrokerBalancer implements ActiveMQComponent {
this.policy = policy;
if (cacheTimeout == -1) {
this.cache = CacheBuilder.newBuilder().build();
} else if (cacheTimeout > 0) {
this.cache = CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS).build();
} else {
this.cache = null;
}
this.cache = cache;
}
@Override
public void start() throws Exception {
if (cache != null) {
cache.start();
}
if (pool != null) {
pool.start();
}
@ -145,6 +141,10 @@ public class BrokerBalancer implements ActiveMQComponent {
if (pool != null) {
pool.stop();
}
if (cache != null) {
cache.stop();
}
}
public TargetResult getTarget(Connection connection, String clientID, String username) {
@ -176,20 +176,25 @@ public class BrokerBalancer implements ActiveMQComponent {
TargetResult result = null;
if (cache != null) {
result = cache.getIfPresent(key);
}
if (result != null) {
if (pool.isTargetReady(result.getTarget())) {
if (logger.isDebugEnabled()) {
logger.debug("The cache returns [" + result.getTarget() + "] ready for " + targetKey + "[" + key + "]");
}
return result;
}
String nodeId = cache.get(key);
if (logger.isDebugEnabled()) {
logger.debug("The cache returns [" + result.getTarget() + "] not ready for " + targetKey + "[" + key + "]");
logger.debug("The cache returns target [" + nodeId + "] for " + targetKey + "[" + key + "]");
}
if (nodeId != null) {
Target target = pool.getReadyTarget(nodeId);
if (target != null) {
if (logger.isDebugEnabled()) {
logger.debug("The target [" + nodeId + "] is ready for " + targetKey + "[" + key + "]");
}
return new TargetResult(target);
}
if (logger.isDebugEnabled()) {
logger.debug("The target [" + nodeId + "] is not ready for " + targetKey + "[" + key + "]");
}
}
}
@ -207,7 +212,7 @@ public class BrokerBalancer implements ActiveMQComponent {
if (logger.isDebugEnabled()) {
logger.debug("Caching " + targetKey + "[" + key + "] for [" + target + "]");
}
cache.put(key, result);
cache.put(key, target.getNodeID());
}
}

View File

@ -21,11 +21,14 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
import org.apache.activemq.artemis.core.server.balancing.caches.LocalCache;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactory;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
@ -54,6 +57,8 @@ import java.util.concurrent.ScheduledExecutorService;
public final class BrokerBalancerManager implements ActiveMQComponent {
private static final Logger logger = Logger.getLogger(BrokerBalancerManager.class);
public static final String CACHE_ID_PREFIX = "$.BC.";
private final Configuration config;
@ -91,6 +96,13 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
Target localTarget = new LocalTarget(null, server);
Cache cache = null;
CacheConfiguration cacheConfiguration = config.getCacheConfiguration();
if (cacheConfiguration != null) {
cache = deployCache(cacheConfiguration, config.getName());
}
Pool pool = null;
final PoolConfiguration poolConfiguration = config.getPoolConfiguration();
if (poolConfiguration != null) {
@ -110,13 +122,20 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
}
BrokerBalancer balancer = new BrokerBalancer(config.getName(), config.getTargetKey(), config.getTargetKeyFilter(),
localTarget, config.getLocalTargetFilter(), pool, policy, transformer, config.getCacheTimeout());
localTarget, config.getLocalTargetFilter(), cache, pool, policy, transformer);
balancerControllers.put(balancer.getName(), balancer);
server.getManagementService().registerBrokerBalancer(balancer);
}
private Cache deployCache(CacheConfiguration configuration, String name) throws ClassNotFoundException {
Cache cache = new LocalCache(CACHE_ID_PREFIX + name, configuration.isPersisted(),
configuration.getTimeout(), server.getStorageManager());
return cache;
}
private Pool deployPool(PoolConfiguration config, Target localTarget) throws Exception {
Pool pool;
TargetFactory targetFactory = new ActiveMQTargetFactory();

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.caches;
public interface Cache {
void start();
void stop();
String get(String key);
void put(String key, String nodeId);
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.caches;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.jboss.logging.Logger;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class LocalCache implements Cache, RemovalListener<String, String> {
private static final Logger logger = Logger.getLogger(LocalCache.class);
private String id;
private boolean persisted;
private int timeout;
private StorageManager storageManager;
private com.google.common.cache.Cache<String, String> cache;
private Map<String, PersistedKeyValuePair> persistedCacheEntries;
private volatile boolean running;
public String getId() {
return id;
}
public boolean isPersisted() {
return persisted;
}
public int getTimeout() {
return timeout;
}
public LocalCache(String id, boolean persisted, int timeout, StorageManager storageManager) {
this.id = id;
this.persisted = persisted;
this.timeout = timeout;
this.storageManager = storageManager;
if (timeout == 0) {
cache = CacheBuilder.newBuilder().build();
} else {
cache = CacheBuilder.newBuilder().removalListener(this).expireAfterAccess(timeout, TimeUnit.MILLISECONDS).build();
}
}
@Override
public void start() {
if (persisted) {
persistedCacheEntries = storageManager.getPersistedKeyValuePairs(id);
for (Map.Entry<String, PersistedKeyValuePair> cacheEntry : persistedCacheEntries.entrySet()) {
cache.put(cacheEntry.getKey(), cacheEntry.getValue().getValue());
logger.info(cacheEntry.toString());
}
}
running = true;
}
@Override
public void stop() {
cache.cleanUp();
if (persistedCacheEntries != null) {
persistedCacheEntries.clear();
}
running = false;
}
@Override
public String get(String key) {
return cache.getIfPresent(key);
}
@Override
public void put(String key, String nodeId) {
if (persisted) {
PersistedKeyValuePair persistedKeyValuePair = persistedCacheEntries.get(key);
if (persistedKeyValuePair == null || !Objects.equals(nodeId, persistedKeyValuePair.getValue())) {
persistedKeyValuePair = new PersistedKeyValuePair(id, key, nodeId);
try {
storageManager.storeKeyValuePair(persistedKeyValuePair);
} catch (Exception e) {
throw new RuntimeException(e);
}
persistedCacheEntries.put(key, persistedKeyValuePair);
}
}
cache.put(key, nodeId);
}
@Override
public void onRemoval(RemovalNotification<String, String> notification) {
if (running && persisted) {
PersistedKeyValuePair persistedKeyValuePair = persistedCacheEntries.remove(notification.getKey());
if (persistedKeyValuePair != null) {
try {
storageManager.deleteKeyValuePair(persistedKeyValuePair.getMapId(), persistedKeyValuePair.getKey());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}

View File

@ -175,6 +175,27 @@ public abstract class AbstractPool implements Pool {
return targetMonitor != null ? targetMonitor.isTargetReady() : false;
}
@Override
public Target getReadyTarget(String nodeId) {
int readyTargets;
long deadline = quorumTimeout > 0 ? System.nanoTime() + quorumTimeoutNanos : 0;
do {
readyTargets = 0;
for (TargetMonitor targetMonitor : targetMonitors) {
if (targetMonitor.isTargetReady()) {
readyTargets++;
if (nodeId.equals(targetMonitor.getTarget().getNodeID())) {
return targetMonitor.getTarget();
}
}
}
}
while(readyTargets < quorumSize && deadline > 0 && (System.nanoTime() - deadline) < 0);
return null;
}
@Override
public void addTargetProbe(TargetProbe probe) {
targetProbes.add(probe);

View File

@ -46,6 +46,8 @@ public interface Pool extends ActiveMQComponent {
Target getTarget(String nodeId);
Target getReadyTarget(String nodeId);
boolean isTargetReady(Target target);
List<Target> getTargets();

View File

@ -69,6 +69,10 @@ public class ActiveMQTarget extends AbstractTarget implements FailureListener {
false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE,
BrokerBalancer.CLIENT_ID_PREFIX + UUIDGenerator.getInstance().generateStringUUID()).start());
if (getNodeID() == null) {
setNodeID(getAttribute(ResourceNames.BROKER, "NodeID", String.class, 3000));
}
connected = true;
fireConnectedEvent();
@ -92,10 +96,6 @@ public class ActiveMQTarget extends AbstractTarget implements FailureListener {
@Override
public boolean checkReadiness() {
try {
if (getNodeID() == null) {
setNodeID(getAttribute(ResourceNames.BROKER, "NodeID", String.class, 3000));
}
return getAttribute(ResourceNames.BROKER, "Active", Boolean.class, 3000);
} catch (Exception e) {
logger.warn("Error on check readiness", e);

View File

@ -2123,7 +2123,7 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="cache-timeout" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:element name="cache" type="brokerBalancerCacheType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the time period for a cache entry to remain active
@ -2171,6 +2171,26 @@
</xsd:restriction>
</xsd:simpleType>
<xsd:complexType name="brokerBalancerCacheType">
<xsd:sequence maxOccurs="unbounded">
<xsd:element name="persisted" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
true means that the cache entries are persisted
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="timeout" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the timeout (in milliseconds) before removing cache entries
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
<xsd:complexType name="brokerBalancerPolicyType">
<xsd:sequence>
<xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">

View File

@ -301,7 +301,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
} else {
Assert.assertEquals(bc.getTargetKey(), TargetKey.SOURCE_IP);
Assert.assertEquals("least-connections-balancer", bc.getName());
Assert.assertEquals(60000, bc.getCacheTimeout());
Assert.assertNotNull(bc.getCacheConfiguration());
Assert.assertEquals(true, bc.getCacheConfiguration().isPersisted());
Assert.assertEquals(60000, bc.getCacheConfiguration().getTimeout());
Assert.assertEquals(bc.getPolicyConfiguration().getName(), LeastConnectionsPolicy.NAME);
Assert.assertEquals(3000, bc.getPoolConfiguration().getCheckPeriod());
Assert.assertEquals(2, bc.getPoolConfiguration().getQuorumSize());

View File

@ -53,7 +53,7 @@ public class BrokerBalancerTest {
Pool pool = null;
Policy policy = null;
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
localTarget, "^FOO.*", pool, policy, null, 0);
localTarget, "^FOO.*", null, pool, policy, null);
assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
}
@ -69,7 +69,7 @@ public class BrokerBalancerTest {
}
};
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
localTarget, "^FOO.*", pool, policy, keyTransformer, 0);
localTarget, "^FOO.*", null, pool, policy, keyTransformer);
assertEquals( localTarget, underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
}

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.caches;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class LocalCacheTest {
private static final String CACHE_NAME = "TEST";
private static final int CACHE_TIMEOUT = 500;
private static final String CACHE_ENTRY_KEY = "TEST_KEY";
private static final String CACHE_ENTRY_VALUE = "TEST_VALUE";
@Test
public void testValidEntry() {
LocalCache cache = new LocalCache(CACHE_NAME, false, 0, null);
cache.start();
try {
cache.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
Assert.assertEquals(CACHE_ENTRY_VALUE, cache.get(CACHE_ENTRY_KEY));
} finally {
cache.stop();
}
}
@Test
public void testExpiration() throws Exception {
LocalCache cache = new LocalCache(CACHE_NAME, false, CACHE_TIMEOUT, null);
cache.start();
try {
cache.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
Assert.assertEquals(CACHE_ENTRY_VALUE, cache.get(CACHE_ENTRY_KEY));
Wait.assertTrue(() -> cache.get(CACHE_ENTRY_KEY) == null, CACHE_TIMEOUT * 2, CACHE_TIMEOUT);
} finally {
cache.stop();
}
}
@Test
public void testPersistedEntry() {
StorageManager storageManager = new DummyKeyValuePairStorageManager();
LocalCache cacheBeforeStop = new LocalCache(CACHE_NAME, true, 0, storageManager);
cacheBeforeStop.start();
try {
cacheBeforeStop.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
Assert.assertEquals(CACHE_ENTRY_VALUE, cacheBeforeStop.get(CACHE_ENTRY_KEY));
} finally {
cacheBeforeStop.stop();
}
Assert.assertEquals(CACHE_ENTRY_VALUE, storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
LocalCache cacheAfterStop = new LocalCache(CACHE_NAME, true, 0, storageManager);
cacheAfterStop.start();
try {
Assert.assertEquals(CACHE_ENTRY_VALUE, cacheAfterStop.get(CACHE_ENTRY_KEY));
} finally {
cacheAfterStop.stop();
}
Assert.assertEquals(CACHE_ENTRY_VALUE, storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
}
@Test
public void testPersistedExpiration() throws Exception {
StorageManager storageManager = new DummyKeyValuePairStorageManager();
LocalCache cacheBeforeStop = new LocalCache(CACHE_NAME, true, CACHE_TIMEOUT, storageManager);
cacheBeforeStop.start();
try {
cacheBeforeStop.put(CACHE_ENTRY_KEY, CACHE_ENTRY_VALUE);
Assert.assertEquals(CACHE_ENTRY_VALUE, cacheBeforeStop.get(CACHE_ENTRY_KEY));
} finally {
cacheBeforeStop.stop();
}
Assert.assertEquals(CACHE_ENTRY_VALUE, storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY).getValue());
LocalCache cacheAfterStop = new LocalCache(CACHE_NAME, true, CACHE_TIMEOUT, storageManager);
cacheAfterStop.start();
try {
Assert.assertEquals(CACHE_ENTRY_VALUE, cacheAfterStop.get(CACHE_ENTRY_KEY));
Thread.sleep(CACHE_TIMEOUT * 2);
Assert.assertNull(cacheAfterStop.get(CACHE_ENTRY_KEY));
} finally {
cacheAfterStop.stop();
}
Assert.assertNull(storageManager.getPersistedKeyValuePairs(CACHE_NAME).get(CACHE_ENTRY_KEY));
}
static class DummyKeyValuePairStorageManager extends NullStorageManager {
private Map<String, Map<String, PersistedKeyValuePair>> mapPersistedKeyValuePairs = new ConcurrentHashMap<>();
@Override
public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
Map<String, PersistedKeyValuePair> persistedKeyValuePairs = mapPersistedKeyValuePairs.get(persistedKeyValuePair.getMapId());
if (persistedKeyValuePairs == null) {
persistedKeyValuePairs = new HashMap<>();
mapPersistedKeyValuePairs.put(persistedKeyValuePair.getMapId(), persistedKeyValuePairs);
}
persistedKeyValuePairs.put(persistedKeyValuePair.getKey(), persistedKeyValuePair);
}
@Override
public void deleteKeyValuePair(String mapId, String key) throws Exception {
Map<String, PersistedKeyValuePair> persistedKeyValuePairs = mapPersistedKeyValuePairs.get(mapId);
if (persistedKeyValuePairs != null) {
persistedKeyValuePairs.remove(key);
}
}
@Override
public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId) {
Map<String, PersistedKeyValuePair> persistedKeyValuePairs = mapPersistedKeyValuePairs.get(mapId);
return persistedKeyValuePairs != null ? new HashMap<>(persistedKeyValuePairs) : new HashMap<>();
}
}
}

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@ -666,6 +667,21 @@ public class TransactionImplTest extends ActiveMQTestBase {
return null;
}
@Override
public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
}
@Override
public void deleteKeyValuePair(String mapId, String key) throws Exception {
}
@Override
public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId) {
return null;
}
@Override
public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
return 0;

View File

@ -188,7 +188,10 @@
</pool>
</broker-balancer>
<broker-balancer name="least-connections-balancer">
<cache-timeout>60000</cache-timeout>
<cache>
<persisted>true</persisted>
<timeout>60000</timeout>
</cache>
<policy name="LEAST_CONNECTIONS"/>
<pool>
<check-period>3000</check-period>

View File

@ -179,7 +179,10 @@
</pool>
</broker-balancer>
<broker-balancer name="least-connections-balancer">
<cache-timeout>60000</cache-timeout>
<cache>
<persisted>true</persisted>
<timeout>60000</timeout>
</cache>
<policy name="LEAST_CONNECTIONS"/>
<pool>
<check-period>3000</check-period>

View File

@ -103,8 +103,19 @@ A policy is defined by the `policy` element. Let's take a look at a policy examp
The broker balancer provides a cache with a timeout to improve the stickiness of the target broker selected,
returning the same target broker for a target key as long as it is present in the cache and is ready.
So a broker balancer with the cache enabled doesn't strictly follow the configured policy.
By default, the cache is enabled and will never timeout. See below
for more details about setting the `cache-timeout` parameter.
By default, the cache is not enabled.
A cache is defined by the `cache` element that includes the following items:
* the `persisted` element defines whether the cache has to persist entries, default is `false`;
* the `timeout` element defines the timeout before removing entries, measured in milliseconds, setting 0 will disable the timeout, default is `0`.
Let's take a look at a cache example from broker.xml:
```xml
<cache>
<persisted>true</persisted>
<timeout>60000</timeout>
</cache>
```
## Key transformers
A `local-target-key-transformer` allows target key transformation before matching against any local-target-filter. One use case is
@ -119,9 +130,8 @@ A broker balancer is defined by the `broker-balancer` element, it includes the f
* the `target-key-filter` element defines a regular expression to filter the resolved keys;
* the `local-target-filter` element defines a regular expression to match the keys that have to return a local target;
* the `local-target-key-transformer` element defines a key transformer, see [key transformers](#key-transformers);
* the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration;
* the `pool` element defines the pool to group the target brokers, see [pools](#pools).
* the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies);
* the `pool` element defines the pool to group the target brokers, see [pools](#pools);
* the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies).
Let's take a look at some broker balancer examples from broker.xml:
```xml

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 12 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 12 KiB

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@ -151,6 +152,16 @@ public class BalancingTestBase extends ClusterTestBase {
}
protected void setupBalancerLocalCache(final int node, boolean persisted, int timeout) {
Configuration configuration = getServer(node).getConfiguration();
BrokerBalancerConfiguration brokerBalancerConfiguration = configuration.getBalancerConfigurations().stream()
.filter(brokerBalancerConfiguration1 -> brokerBalancerConfiguration1.getName().equals(BROKER_BALANCER_NAME)).findFirst().get();
brokerBalancerConfiguration.setCacheConfiguration(
new CacheConfiguration().setPersisted(persisted).setTimeout(timeout));
}
protected ConnectionFactory createFactory(String protocol, boolean sslEnabled, String host, int port, String clientID, String user, String password) throws Exception {
return createFactory(protocol, sslEnabled, host, port, clientID, user, password, -1);
}
@ -215,7 +226,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries + "&failover.randomize=true");
urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries);
if (clientID != null) {
urlBuilder.append("&jms.clientID=");
@ -243,7 +254,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
urlBuilder.append("?startupMaxReconnectAttempts=" + retries + "&maxReconnectAttempts=" + retries);
urlBuilder.append("?randomize=false&startupMaxReconnectAttempts=" + retries + "&maxReconnectAttempts=" + retries);
if (clientID != null) {
urlBuilder.append("&jms.clientID=");

View File

@ -140,15 +140,27 @@ public class RedirectTest extends BalancingTestBase {
@Test
public void testRoundRobinRedirect() throws Exception {
testEvenlyRedirect(RoundRobinPolicy.NAME, null);
testEvenlyRedirect(RoundRobinPolicy.NAME, null, false);
}
@Test
public void testLeastConnectionsRedirect() throws Exception {
testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD, String.valueOf(30)));
testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(
LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD, String.valueOf(30)), false);
}
private void testEvenlyRedirect(final String policyName, final Map<String, String> properties) throws Exception {
@Test
public void testRoundRobinRedirectWithFailure() throws Exception {
testEvenlyRedirect(RoundRobinPolicy.NAME, null, true);
}
@Test
public void testLeastConnectionsRedirectWithFailure() throws Exception {
testEvenlyRedirect(LeastConnectionsPolicy.NAME, Collections.singletonMap(
LeastConnectionsPolicy.CONNECTION_COUNT_THRESHOLD, String.valueOf(30)), true);
}
private void testEvenlyRedirect(final String policyName, final Map<String, String> properties, final boolean withFailure) throws Exception {
final String queueName = "RedirectTestQueue";
final int targets = MULTIPLE_TARGETS;
int[] nodes = new int[targets + 1];
@ -174,6 +186,10 @@ public class RedirectTest extends BalancingTestBase {
setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, policyName, properties, false, null, targets, 1, 2, 3);
}
if (withFailure) {
setupBalancerLocalCache(0, true, 0);
}
startServers(nodes);
for (int node : nodes) {
@ -214,6 +230,12 @@ public class RedirectTest extends BalancingTestBase {
Assert.assertEquals("Messages of node " + targetNode, 1, queueControls[targetNode].countMessages());
}
if (withFailure) {
crashAndWaitForFailure(getServer(0));
startServers(0);
}
for (int i = 0; i < targets; i++) {
try (Connection connection = connectionFactories[i].createConnection()) {
connection.start();

View File

@ -62,6 +62,7 @@ import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedUser;
@ -756,6 +757,21 @@ public class SendAckFailTest extends SpawnedTestBase {
return manager.getPersistedRoles();
}
@Override
public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception {
manager.storeKeyValuePair(persistedKeyValuePair);
}
@Override
public void deleteKeyValuePair(String mapId, String key) throws Exception {
manager.deleteKeyValuePair(mapId, key);
}
@Override
public Map<String, PersistedKeyValuePair> getPersistedKeyValuePairs(String mapId) {
return manager.getPersistedKeyValuePairs(mapId);
}
@Override
public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
return manager.storePageCounter(txID, queueID, value, size);