ARTEMIS-4016 Bridges created by management operations are removed on restart and config reload

This commit is contained in:
AntonRoskvist 2022-09-26 17:26:36 +02:00 committed by Justin Bertram
parent 24f0d4e3e8
commit 04ddeb647c
14 changed files with 539 additions and 7 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.config;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.json.JsonObject;
@ -31,6 +32,8 @@ import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.utils.BufferHelper;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.JsonLoader;
public final class BridgeConfiguration implements Serializable {
@ -62,6 +65,7 @@ public final class BridgeConfiguration implements Serializable {
public static String CALL_TIMEOUT = "call-timeout";
public static String ROUTING_TYPE = "routing-type";
public static String CONCURRENCY = "concurrency";
public static String CONFIGURATION_MANAGED = "configuration-managed";
private String name = null;
@ -118,6 +122,8 @@ public final class BridgeConfiguration implements Serializable {
private String parentName = null;
private boolean configurationManaged = true;
public BridgeConfiguration() {
}
@ -148,6 +154,7 @@ public final class BridgeConfiguration implements Serializable {
callTimeout = other.callTimeout;
routingType = other.routingType;
concurrency = other.concurrency;
configurationManaged = other.configurationManaged;
}
public BridgeConfiguration(String name) {
@ -527,6 +534,15 @@ public final class BridgeConfiguration implements Serializable {
return this;
}
public boolean isConfigurationManaged() {
return configurationManaged;
}
public BridgeConfiguration setConfigurationManaged(boolean configurationManaged) {
this.configurationManaged = configurationManaged;
return this;
}
public ComponentConfigurationRoutingType getRoutingType() {
return routingType;
}
@ -611,6 +627,7 @@ public final class BridgeConfiguration implements Serializable {
builder.add(MIN_LARGE_MESSAGE_SIZE, getMinLargeMessageSize());
builder.add(CALL_TIMEOUT, getCallTimeout());
builder.add(CONCURRENCY, getConcurrency());
builder.add(CONFIGURATION_MANAGED, isConfigurationManaged());
// complex fields (only serialize if value is not null)
@ -705,6 +722,7 @@ public final class BridgeConfiguration implements Serializable {
result = prime * result + (useDuplicateDetection ? 1231 : 1237);
result = prime * result + ((user == null) ? 0 : user.hashCode());
result = prime * result + concurrency;
result = prime * result + (configurationManaged ? 1231 : 1237);
return result;
}
@ -790,7 +808,147 @@ public final class BridgeConfiguration implements Serializable {
return false;
if (concurrency != other.concurrency)
return false;
if (configurationManaged != other.configurationManaged)
return false;
return true;
}
public int getEncodeSize() {
int transformerSize = 0;
if (transformerConfiguration != null) {
transformerSize += BufferHelper.sizeOfNullableString(transformerConfiguration.getClassName());
transformerSize += DataConstants.INT;
Map<String, String> properties = transformerConfiguration.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
transformerSize += BufferHelper.sizeOfNullableString(entry.getKey());
transformerSize += BufferHelper.sizeOfNullableString(entry.getValue());
}
}
int staticConnectorSize = 0;
if (staticConnectors != null) {
staticConnectorSize += BufferHelper.sizeOfNullableInteger(staticConnectors.size());
for (String connector : staticConnectors) {
staticConnectorSize += BufferHelper.sizeOfNullableString(connector);
}
}
int size = BufferHelper.sizeOfNullableString(name) +
BufferHelper.sizeOfNullableString(parentName) +
BufferHelper.sizeOfNullableString(queueName) +
BufferHelper.sizeOfNullableString(forwardingAddress) +
BufferHelper.sizeOfNullableString(filterString) +
BufferHelper.sizeOfNullableString(discoveryGroupName) +
BufferHelper.sizeOfNullableBoolean(ha) +
BufferHelper.sizeOfNullableLong(retryInterval) +
BufferHelper.sizeOfNullableDouble(retryIntervalMultiplier) +
BufferHelper.sizeOfNullableInteger(initialConnectAttempts) +
BufferHelper.sizeOfNullableInteger(reconnectAttempts) +
BufferHelper.sizeOfNullableInteger(reconnectAttemptsOnSameNode) +
BufferHelper.sizeOfNullableBoolean(useDuplicateDetection) +
BufferHelper.sizeOfNullableInteger(confirmationWindowSize) +
BufferHelper.sizeOfNullableInteger(producerWindowSize) +
BufferHelper.sizeOfNullableLong(clientFailureCheckPeriod) +
BufferHelper.sizeOfNullableString(user) +
BufferHelper.sizeOfNullableString(password) +
BufferHelper.sizeOfNullableLong(connectionTTL) +
BufferHelper.sizeOfNullableLong(maxRetryInterval) +
BufferHelper.sizeOfNullableInteger(minLargeMessageSize) +
BufferHelper.sizeOfNullableLong(callTimeout) +
BufferHelper.sizeOfNullableInteger(concurrency) +
BufferHelper.sizeOfNullableBoolean(configurationManaged) +
DataConstants.SIZE_BYTE +
transformerSize +
staticConnectorSize;
return size;
}
public void encode(ActiveMQBuffer buffer) {
buffer.writeNullableString(name);
buffer.writeNullableString(parentName);
buffer.writeNullableString(queueName);
buffer.writeNullableString(forwardingAddress);
buffer.writeNullableString(filterString);
buffer.writeNullableString(discoveryGroupName);
buffer.writeNullableBoolean(ha);
buffer.writeNullableLong(retryInterval);
BufferHelper.writeNullableDouble(buffer, retryIntervalMultiplier);
buffer.writeNullableInt(initialConnectAttempts);
buffer.writeNullableInt(reconnectAttempts);
buffer.writeNullableInt(reconnectAttemptsOnSameNode);
buffer.writeNullableBoolean(useDuplicateDetection);
buffer.writeNullableInt(confirmationWindowSize);
buffer.writeNullableInt(producerWindowSize);
buffer.writeNullableLong(clientFailureCheckPeriod);
buffer.writeNullableString(user);
buffer.writeNullableString(password);
buffer.writeNullableLong(connectionTTL);
buffer.writeNullableLong(maxRetryInterval);
buffer.writeNullableInt(minLargeMessageSize);
buffer.writeNullableLong(callTimeout);
buffer.writeNullableInt(concurrency);
buffer.writeNullableBoolean(configurationManaged);
buffer.writeByte(routingType != null ? routingType.getType() : ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType()).getType());
if (transformerConfiguration != null) {
buffer.writeString(transformerConfiguration.getClassName());
Map<String, String> properties = transformerConfiguration.getProperties();
buffer.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
buffer.writeNullableString(entry.getKey());
buffer.writeNullableString(entry.getValue());
}
} else {
buffer.writeNullableString(null);
}
if (staticConnectors != null) {
buffer.writeInt(staticConnectors.size());
for (String connector : staticConnectors) {
buffer.writeNullableString(connector);
}
} else {
buffer.writeInt(0);
}
}
public void decode(ActiveMQBuffer buffer) {
name = buffer.readNullableString();
parentName = buffer.readNullableString();
queueName = buffer.readNullableString();
forwardingAddress = buffer.readNullableString();
filterString = buffer.readNullableString();
discoveryGroupName = buffer.readNullableString();
ha = buffer.readNullableBoolean();
retryInterval = buffer.readNullableLong();
retryIntervalMultiplier = BufferHelper.readNullableDouble(buffer);
initialConnectAttempts = buffer.readNullableInt();
reconnectAttempts = buffer.readNullableInt();
reconnectAttemptsOnSameNode = buffer.readNullableInt();
useDuplicateDetection = buffer.readNullableBoolean();
confirmationWindowSize = buffer.readNullableInt();
producerWindowSize = buffer.readNullableInt();
clientFailureCheckPeriod = buffer.readNullableLong();
user = buffer.readNullableString();
password = buffer.readNullableString();
connectionTTL = buffer.readNullableLong();
maxRetryInterval = buffer.readNullableLong();
minLargeMessageSize = buffer.readNullableInt();
callTimeout = buffer.readNullableLong();
concurrency = buffer.readNullableInt();
configurationManaged = buffer.readNullableBoolean();
routingType = ComponentConfigurationRoutingType.getType(buffer.readByte());
String transformerClassName = buffer.readNullableString();
if (transformerClassName != null) {
transformerConfiguration = new TransformerConfiguration(transformerClassName);
int propsSize = buffer.readInt();
for (int i = 0; i < propsSize; i++) {
transformerConfiguration.getProperties().put(buffer.readNullableString(), buffer.readNullableString());
}
}
int numStaticConnectors = buffer.readInt();
if (numStaticConnectors > 0) {
staticConnectors = new ArrayList<>();
for (int i = 0; i < numStaticConnectors; i++) {
staticConnectors.add(buffer.readNullableString());
}
}
}
}

View File

@ -3847,7 +3847,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
try {
TransformerConfiguration transformerConfiguration = transformerClassName == null || transformerClassName.isEmpty() ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password).setConfigurationManaged(false);
if (useDiscoveryGroup) {
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
@ -3891,7 +3891,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
try {
TransformerConfiguration transformerConfiguration = transformerClassName == null || transformerClassName.isEmpty() ? null : new TransformerConfiguration(transformerClassName);
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password).setConfigurationManaged(false);
if (useDiscoveryGroup) {
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
@ -3920,6 +3920,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
if (bridgeConfiguration == null) {
throw ActiveMQMessageBundle.BUNDLE.failedToParseJson(bridgeConfigurationAsJson);
}
bridgeConfiguration.setConfigurationManaged(false);
server.deployBridge(bridgeConfiguration);
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@ -347,6 +348,12 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
List<PersistedDivertConfiguration> recoverDivertConfigurations();
void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception;
void deleteBridgeConfiguration(String bridgeName) throws Exception;
List<PersistedBridgeConfiguration> recoverBridgeConfigurations();
void storeUser(PersistedUser persistedUser) throws Exception;
void deleteUser(String username) throws Exception;

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.config;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
public class PersistedBridgeConfiguration implements EncodingSupport {
private long storeId;
private BridgeConfiguration bridgeConfiguration;
@Override
public String toString() {
return "PersistedBridgeConfiguration{" + "storeId=" + storeId + ", bridgeConfiguration=" + bridgeConfiguration + '}';
}
public PersistedBridgeConfiguration(BridgeConfiguration bridgeConfiguration) {
this.bridgeConfiguration = bridgeConfiguration;
}
public PersistedBridgeConfiguration() {
bridgeConfiguration = new BridgeConfiguration();
}
public void setStoreId(long id) {
this.storeId = id;
}
public long getStoreId() {
return storeId;
}
@Override
public int getEncodeSize() {
return bridgeConfiguration.getEncodeSize();
}
@Override
public void encode(ActiveMQBuffer buffer) {
bridgeConfiguration.encode(buffer);
}
@Override
public void decode(ActiveMQBuffer buffer) {
bridgeConfiguration.decode(buffer);
}
public String getName() {
return bridgeConfiguration.getParentName();
}
public BridgeConfiguration getBridgeConfiguration() {
return bridgeConfiguration;
}
}

View File

@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@ -223,6 +224,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected final Map<String, PersistedDivertConfiguration> mapPersistedDivertConfigurations = new ConcurrentHashMap<>();
protected final Map<String, PersistedBridgeConfiguration> mapPersistedBridgeConfigurations = new ConcurrentHashMap<>();
protected final Map<String, PersistedUser> mapPersistedUsers = new ConcurrentHashMap<>();
protected final Map<String, PersistedRole> mapPersistedRoles = new ConcurrentHashMap<>();
@ -768,6 +771,32 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return new ArrayList<>(mapPersistedDivertConfigurations.values());
}
@Override
public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
deleteBridgeConfiguration(persistedBridgeConfiguration.getName());
try (ArtemisCloseable lock = closeableReadLock()) {
final long id = idGenerator.generateID();
persistedBridgeConfiguration.setStoreId(id);
bindingsJournal.appendAddRecord(id, JournalRecordIds.BRIDGE_RECORD, persistedBridgeConfiguration, true);
mapPersistedBridgeConfigurations.put(persistedBridgeConfiguration.getName(), persistedBridgeConfiguration);
}
}
@Override
public void deleteBridgeConfiguration(String bridgeName) throws Exception {
PersistedBridgeConfiguration oldBridge = mapPersistedBridgeConfigurations.remove(bridgeName);
if (oldBridge != null) {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.tryAppendDeleteRecord(oldBridge.getStoreId(), this::recordNotFoundCallback, false);
}
}
}
@Override
public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
return new ArrayList<>(mapPersistedBridgeConfigurations.values());
}
@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
deleteUser(persistedUser.getUsername());
@ -1565,6 +1594,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
} else if (rec == JournalRecordIds.DIVERT_RECORD) {
PersistedDivertConfiguration divertConfiguration = newDivertEncoding(id, buffer);
mapPersistedDivertConfigurations.put(divertConfiguration.getName(), divertConfiguration);
} else if (rec == JournalRecordIds.BRIDGE_RECORD) {
PersistedBridgeConfiguration bridgeConfiguration = newBridgeEncoding(id, buffer);
mapPersistedBridgeConfigurations.put(bridgeConfiguration.getName(), bridgeConfiguration);
} else if (rec == JournalRecordIds.USER_RECORD) {
PersistedUser user = newUserEncoding(id, buffer);
mapPersistedUsers.put(user.getUsername(), user);
@ -2047,6 +2079,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return persistedDivertConfiguration;
}
static PersistedBridgeConfiguration newBridgeEncoding(long id, ActiveMQBuffer buffer) {
PersistedBridgeConfiguration persistedBridgeConfiguration = new PersistedBridgeConfiguration();
persistedBridgeConfiguration.decode(buffer);
persistedBridgeConfiguration.setStoreId(id);
return persistedBridgeConfiguration;
}
static PersistedUser newUserEncoding(long id, ActiveMQBuffer buffer) {
PersistedUser persistedUser = new PersistedUser();
persistedUser.decode(buffer);

View File

@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator.IDCounterEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
@ -99,6 +100,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.USER_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.BRIDGE_RECORD;
/**
* Outputs a String description of the Journals contents.
@ -604,6 +606,12 @@ public final class DescribeJournal {
persistedDivertConfiguration.decode(buffer);
return persistedDivertConfiguration;
case BRIDGE_RECORD: {
PersistedBridgeConfiguration persistedBridgeConfiguration = new PersistedBridgeConfiguration();
persistedBridgeConfiguration.decode(buffer);
return persistedBridgeConfiguration;
}
case ADD_LARGE_MESSAGE_PENDING: {
PendingLargeMessageEncoding lmEncoding = new PendingLargeMessageEncoding();
lmEncoding.decode(buffer);

View File

@ -49,6 +49,8 @@ public final class JournalRecordIds {
public static final byte DIVERT_RECORD = 27;
public static final byte BRIDGE_RECORD = 28;
// Message journal record types
/**

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@ -466,6 +467,19 @@ public class NullStorageManager implements StorageManager {
return null;
}
@Override
public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
}
@Override
public void deleteBridgeConfiguration(String bridgeName) throws Exception {
}
@Override
public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
return null;
}
@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
}

View File

@ -93,6 +93,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
@ -2945,8 +2946,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public boolean deployBridge(BridgeConfiguration config) throws Exception {
if (clusterManager != null) {
return clusterManager.deployBridge(config);
if (clusterManager != null && clusterManager.deployBridge(config)) {
//copying and modifying bridgeConfig before storing to deal with "concurrency > 1" bridges
for (Bridge bridge : clusterManager.getBridges().values()) {
BridgeConfiguration copyConfig = new BridgeConfiguration(bridge.getConfiguration());
if (copyConfig.getConcurrency() > 1 && !copyConfig.getName().endsWith("-0")) {
continue;
}
copyConfig.setName(copyConfig.getParentName());
storageManager.storeBridgeConfiguration(new PersistedBridgeConfiguration(copyConfig));
}
return true;
}
return false;
}
@ -2954,7 +2964,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public void destroyBridge(String name) throws Exception {
if (clusterManager != null) {
clusterManager.destroyBridge(name);
//Iterating through all bridges to catch "concurrency > 1" bridges matching supplied name
for (Bridge bridge : clusterManager.getBridges().values()) {
if (bridge.getConfiguration().getParentName().equals(name)) {
clusterManager.destroyBridge(bridge.getConfiguration().getName());
}
}
storageManager.deleteBridgeConfiguration(name);
}
}
@ -3431,6 +3447,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
postOffice.startExpiryScanner();
postOffice.startAddressQueueScanner();
recoverStoredBridges();
}
if (configuration.getMaxDiskUsage() != -1) {
@ -4236,6 +4254,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
private void recoverStoredBridges() throws Exception {
if (storageManager.recoverBridgeConfigurations() != null) {
for (PersistedBridgeConfiguration persistedBridgeConfiguration : storageManager.recoverBridgeConfigurations()) {
deployBridge(persistedBridgeConfiguration.getBridgeConfiguration());
}
}
}
private void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception {
if (config != null) {
GroupingHandler groupingHandler1;
@ -4544,7 +4570,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (BridgeConfiguration newBridgeConfig : configuration.getBridgeConfigurations()) {
newBridgeConfig.setParentName(newBridgeConfig.getName());
Bridge existingBridge = clusterManager.getBridges().get(newBridgeConfig.getParentName());
if (existingBridge != null && !existingBridge.getConfiguration().equals(newBridgeConfig)) {
if (existingBridge != null && !existingBridge.getConfiguration().equals(newBridgeConfig) && existingBridge.getConfiguration().isConfigurationManaged()) {
// this is an existing bridge but the config changed so stop the current bridge and deploy the new one
destroyBridge(existingBridge.getName().toString());
deployBridge(newBridgeConfig);
@ -4557,11 +4583,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
List<BridgeConfiguration> newConfig = configuration.getBridgeConfigurations();
BridgeConfiguration running = new BridgeConfiguration(runningBridge.getConfiguration());
running.set("name", running.getParentName());
if (!configuration.getBridgeConfigurations().contains(running)) {
if (!configuration.getBridgeConfigurations().contains(running) && running.isConfigurationManaged()) {
// this bridge is running but it isn't in the new config which means it was removed so destroy it
destroyBridge(runningBridge.getName().toString());
}
}
recoverStoredBridges();
}
}

View File

@ -192,6 +192,7 @@ public class BridgeConfigurationTest {
objectBuilder.add(BridgeConfiguration.CALL_TIMEOUT, 12);
objectBuilder.add(BridgeConfiguration.ROUTING_TYPE, "MULTICAST");
objectBuilder.add(BridgeConfiguration.CONCURRENCY, 1);
objectBuilder.add(BridgeConfiguration.CONFIGURATION_MANAGED, true);
return objectBuilder.build();
}

View File

@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@ -622,6 +623,19 @@ public class TransactionImplTest extends ActiveMQTestBase {
return null;
}
@Override
public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
}
@Override
public void deleteBridgeConfiguration(String bridgeName) throws Exception {
}
@Override
public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
return null;
}
@Override
public void storeUser(PersistedUser persistedUser) throws Exception {

View File

@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@ -716,6 +717,19 @@ public class SendAckFailTest extends SpawnedTestBase {
return null;
}
@Override
public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
}
@Override
public void deleteBridgeConfiguration(String bridgeName) throws Exception {
}
@Override
public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
return null;
}
@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
manager.storeUser(persistedUser);

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class BridgeConfigurationStorageTest extends StorageManagerTestBase {
public BridgeConfigurationStorageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
@Test
public void testStoreBridgeConfiguration() throws Exception {
createStorage();
BridgeConfiguration configuration = new BridgeConfiguration();
configuration.setName("name");
configuration.setParentName("name");
configuration.setQueueName("QueueName");
configuration.setConcurrency(2);
configuration.setForwardingAddress("forward");
configuration.setProducerWindowSize(123123);
configuration.setConfirmationWindowSize(123123);
configuration.setStaticConnectors(Arrays.asList("connector1", "connector2"));
TransformerConfiguration mytransformer = new TransformerConfiguration("mytransformer");
mytransformer.getProperties().put("key1", "prop1");
mytransformer.getProperties().put("key2", "prop2");
mytransformer.getProperties().put("key3", "prop3");
configuration.setTransformerConfiguration(mytransformer);
journal.storeBridgeConfiguration(new PersistedBridgeConfiguration(configuration));
journal.stop();
journal.start();
List<PersistedBridgeConfiguration> bridgeConfigurations = journal.recoverBridgeConfigurations();
Assert.assertEquals(1, bridgeConfigurations.size());
PersistedBridgeConfiguration persistedBridgeConfiguration = bridgeConfigurations.get(0);
Assert.assertEquals(configuration.getName(), persistedBridgeConfiguration.getBridgeConfiguration().getName());
Assert.assertEquals(configuration.getQueueName(), persistedBridgeConfiguration.getBridgeConfiguration().getQueueName());
Assert.assertEquals(configuration.getConcurrency(), persistedBridgeConfiguration.getBridgeConfiguration().getConcurrency());
Assert.assertEquals(configuration.getForwardingAddress(), persistedBridgeConfiguration.getBridgeConfiguration().getForwardingAddress());
Assert.assertEquals(configuration.getStaticConnectors(), persistedBridgeConfiguration.getBridgeConfiguration().getStaticConnectors());
Assert.assertNotNull(persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration());
Assert.assertEquals("mytransformer", persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration().getClassName());
Map<String, String> properties = persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration().getProperties();
Assert.assertEquals(3, properties.size());
Assert.assertEquals("prop1", properties.get("key1"));
Assert.assertEquals("prop2", properties.get("key2"));
Assert.assertEquals("prop3", properties.get("key3"));
journal.stop();
journal = null;
}
@Test
public void testStoreBridgeConfigurationNoTransformer() throws Exception {
createStorage();
BridgeConfiguration configuration = new BridgeConfiguration();
configuration.setName("name");
configuration.setQueueName("QueueName");
configuration.setConcurrency(2);
configuration.setForwardingAddress("forward");
configuration.setStaticConnectors(Arrays.asList("connector1", "connector2"));
journal.storeBridgeConfiguration(new PersistedBridgeConfiguration(configuration));
journal.stop();
journal.start();
List<PersistedBridgeConfiguration> bridgeConfigurations = journal.recoverBridgeConfigurations();
Assert.assertEquals(1, bridgeConfigurations.size());
PersistedBridgeConfiguration persistedBridgeConfiguration = bridgeConfigurations.get(0);
Assert.assertEquals(configuration.getName(), persistedBridgeConfiguration.getBridgeConfiguration().getName());
Assert.assertEquals(configuration.getQueueName(), persistedBridgeConfiguration.getBridgeConfiguration().getQueueName());
Assert.assertEquals(configuration.getConcurrency(), persistedBridgeConfiguration.getBridgeConfiguration().getConcurrency());
Assert.assertEquals(configuration.getForwardingAddress(), persistedBridgeConfiguration.getBridgeConfiguration().getForwardingAddress());
Assert.assertEquals(configuration.getStaticConnectors(), persistedBridgeConfiguration.getBridgeConfiguration().getStaticConnectors());
Assert.assertNull(persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration());
journal.stop();
journal = null;
}
}

View File

@ -21,17 +21,23 @@ import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
public class ConfigChangeTest extends ActiveMQTestBase {
@ -142,4 +148,49 @@ public class ConfigChangeTest extends ActiveMQTestBase {
server.stop();
}
@Test
public void bridgeConfigChagesPersist() throws Exception {
server = createServer(true);
server.start();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true));
String bridgeName = "bridgeName";
String queue = "Q1";
String forward = "Q2";
session.createQueue(new QueueConfiguration("Q1").setAddress("Q1").setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
session.createQueue(new QueueConfiguration("Q2").setAddress("Q2").setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
session.close();
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName(bridgeName)
.setQueueName(queue)
.setConcurrency(2)
.setForwardingAddress(forward)
.setProducerWindowSize(1234)
.setConfirmationWindowSize(1234)
.setStaticConnectors(Arrays.asList("connector1", "connector2"));
server.getActiveMQServerControl().addConnector("connector1", "tcp://localhost:61616");
server.getActiveMQServerControl().addConnector("connector2", "tcp://localhost:61616");
server.getActiveMQServerControl().createBridge(bridgeConfiguration.toJSON());
Assert.assertEquals(2, server.getActiveMQServerControl().getBridgeNames().length);
server.stop();
server.start();
Assert.assertEquals(2, server.getActiveMQServerControl().getBridgeNames().length);
server.getActiveMQServerControl().destroyBridge(bridgeName);
Assert.assertEquals(0, server.getActiveMQServerControl().getBridgeNames().length);
server.stop();
server.start();
Assert.assertEquals(0, server.getActiveMQServerControl().getBridgeNames().length);
server.stop();
}
}