ARTEMIS-4204 connectors added via management are not durable
This commit is contained in:
parent
6d3dbc4383
commit
6851e7d677
|
@ -88,6 +88,7 @@ import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
|
|||
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
|
@ -3969,6 +3970,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
|
||||
try {
|
||||
server.getConfiguration().addConnectorConfiguration(name, url);
|
||||
storageManager.storeConnector(new PersistedConnector(name, url));
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
@ -3985,6 +3987,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
|
||||
try {
|
||||
server.getConfiguration().getConnectorConfigurations().remove(name);
|
||||
storageManager.deleteConnector(name);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
|
|||
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
|
||||
|
@ -369,6 +370,12 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
|||
|
||||
List<PersistedBridgeConfiguration> recoverBridgeConfigurations();
|
||||
|
||||
void storeConnector(PersistedConnector persistedConnector) throws Exception;
|
||||
|
||||
void deleteConnector(String connectorName) throws Exception;
|
||||
|
||||
List<PersistedConnector> recoverConnectors();
|
||||
|
||||
void storeUser(PersistedUser persistedUser) throws Exception;
|
||||
|
||||
void deleteUser(String username) throws Exception;
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.persistence.config;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.utils.BufferHelper;
|
||||
|
||||
public class PersistedConnector implements EncodingSupport {
|
||||
|
||||
private long storeId;
|
||||
|
||||
private String url;
|
||||
|
||||
private String name;
|
||||
|
||||
public PersistedConnector() {
|
||||
}
|
||||
|
||||
public PersistedConnector(String name, String url) {
|
||||
this.name = name;
|
||||
this.url = url;
|
||||
}
|
||||
|
||||
public void setStoreId(long id) {
|
||||
this.storeId = id;
|
||||
}
|
||||
|
||||
public long getStoreId() {
|
||||
return storeId;
|
||||
}
|
||||
|
||||
public void setUrl(String url) {
|
||||
this.url = url;
|
||||
}
|
||||
|
||||
public String getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getEncodeSize() {
|
||||
int size = 0;
|
||||
size += BufferHelper.sizeOfString(name);
|
||||
size += BufferHelper.sizeOfString(url);
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ActiveMQBuffer buffer) {
|
||||
buffer.writeString(name);
|
||||
buffer.writeString(url);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ActiveMQBuffer buffer) {
|
||||
name = buffer.readString();
|
||||
url = buffer.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PersistedConnector [storeId=" + storeId +
|
||||
", name=" +
|
||||
name +
|
||||
", url=" +
|
||||
url +
|
||||
"]";
|
||||
}
|
||||
}
|
|
@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
|||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
|
||||
|
@ -225,6 +226,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
|
||||
protected final Map<String, PersistedBridgeConfiguration> mapPersistedBridgeConfigurations = new ConcurrentHashMap<>();
|
||||
|
||||
protected final Map<String, PersistedConnector> mapPersistedConnectors = new ConcurrentHashMap<>();
|
||||
|
||||
protected final Map<String, PersistedUser> mapPersistedUsers = new ConcurrentHashMap<>();
|
||||
|
||||
protected final Map<String, PersistedRole> mapPersistedRoles = new ConcurrentHashMap<>();
|
||||
|
@ -796,6 +799,32 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
return new ArrayList<>(mapPersistedBridgeConfigurations.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeConnector(PersistedConnector persistedConnector) throws Exception {
|
||||
deleteConnector(persistedConnector.getName());
|
||||
try (ArtemisCloseable lock = closeableReadLock()) {
|
||||
final long id = idGenerator.generateID();
|
||||
persistedConnector.setStoreId(id);
|
||||
bindingsJournal.appendAddRecord(id, JournalRecordIds.CONNECTOR_RECORD, persistedConnector, true);
|
||||
mapPersistedConnectors.put(persistedConnector.getName(), persistedConnector);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteConnector(String connectorName) throws Exception {
|
||||
PersistedConnector oldConnector = mapPersistedConnectors.remove(connectorName);
|
||||
if (oldConnector != null) {
|
||||
try (ArtemisCloseable lock = closeableReadLock()) {
|
||||
bindingsJournal.tryAppendDeleteRecord(oldConnector.getStoreId(), this::recordNotFoundCallback, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PersistedConnector> recoverConnectors() {
|
||||
return new ArrayList<>(mapPersistedConnectors.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeUser(PersistedUser persistedUser) throws Exception {
|
||||
deleteUser(persistedUser.getUsername());
|
||||
|
@ -1628,6 +1657,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
mapPersistedKeyValuePairs.put(keyValuePair.getMapId(), persistedKeyValuePairs);
|
||||
}
|
||||
persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair);
|
||||
} else if (rec == JournalRecordIds.CONNECTOR_RECORD) {
|
||||
PersistedConnector connector = newConnectorEncoding(id, buffer);
|
||||
mapPersistedConnectors.put(connector.getName(), connector);
|
||||
} else {
|
||||
// unlikely to happen
|
||||
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec));
|
||||
|
@ -2096,6 +2128,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
return persistedBridgeConfiguration;
|
||||
}
|
||||
|
||||
static PersistedConnector newConnectorEncoding(long id, ActiveMQBuffer buffer) {
|
||||
PersistedConnector persistedBridgeConfiguration = new PersistedConnector();
|
||||
persistedBridgeConfiguration.decode(buffer);
|
||||
persistedBridgeConfiguration.setStoreId(id);
|
||||
return persistedBridgeConfiguration;
|
||||
}
|
||||
|
||||
static PersistedUser newUserEncoding(long id, ActiveMQBuffer buffer) {
|
||||
PersistedUser persistedUser = new PersistedUser();
|
||||
persistedUser.decode(buffer);
|
||||
|
|
|
@ -102,4 +102,6 @@ public final class JournalRecordIds {
|
|||
public static final byte ADD_MESSAGE_BODY = 49;
|
||||
|
||||
public static final byte KEY_VALUE_PAIR_RECORD = 50;
|
||||
|
||||
public static final byte CONNECTOR_RECORD = 51;
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
|||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
|
||||
|
@ -481,6 +482,19 @@ public class NullStorageManager implements StorageManager {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeConnector(PersistedConnector persistedConnector) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteConnector(String connectorName) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PersistedConnector> recoverConnectors() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeUser(PersistedUser persistedUser) throws Exception {
|
||||
}
|
||||
|
|
|
@ -96,6 +96,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
|||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
|
||||
|
@ -3480,6 +3481,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
postOffice.startAddressQueueScanner();
|
||||
|
||||
recoverStoredConnectors();
|
||||
|
||||
recoverStoredBridges();
|
||||
}
|
||||
|
||||
|
@ -4296,6 +4299,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
}
|
||||
|
||||
private void recoverStoredConnectors() throws Exception {
|
||||
if (storageManager.recoverConnectors() != null) {
|
||||
for (PersistedConnector persistedConnector : storageManager.recoverConnectors()) {
|
||||
getConfiguration().addConnectorConfiguration(persistedConnector.getName(), persistedConnector.getUrl());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception {
|
||||
if (config != null) {
|
||||
GroupingHandler groupingHandler1;
|
||||
|
@ -4627,6 +4638,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
destroyBridge(existingBridge.getConfiguration().getParentName());
|
||||
}
|
||||
}
|
||||
recoverStoredConnectors();
|
||||
recoverStoredBridges();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
|||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
|
||||
|
@ -637,6 +638,19 @@ public class TransactionImplTest extends ActiveMQTestBase {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeConnector(PersistedConnector persistedConnector) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteConnector(String connectorName) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PersistedConnector> recoverConnectors() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeUser(PersistedUser persistedUser) throws Exception {
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
|
|||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
|
||||
|
@ -731,6 +732,19 @@ public class SendAckFailTest extends SpawnedTestBase {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeConnector(PersistedConnector persistedConnector) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteConnector(String connectorName) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PersistedConnector> recoverConnectors() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeUser(PersistedUser persistedUser) throws Exception {
|
||||
manager.storeUser(persistedUser);
|
||||
|
|
|
@ -150,8 +150,7 @@ public class ConfigChangeTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void bridgeConfigChagesPersist() throws Exception {
|
||||
|
||||
public void bridgeConfigChangesPersist() throws Exception {
|
||||
server = createServer(true);
|
||||
server.start();
|
||||
|
||||
|
@ -163,8 +162,8 @@ public class ConfigChangeTest extends ActiveMQTestBase {
|
|||
String queue = "Q1";
|
||||
String forward = "Q2";
|
||||
|
||||
session.createQueue(new QueueConfiguration("Q1").setAddress("Q1").setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
|
||||
session.createQueue(new QueueConfiguration("Q2").setAddress("Q2").setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
|
||||
session.createQueue(new QueueConfiguration(queue).setAddress(queue).setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
|
||||
session.createQueue(new QueueConfiguration(forward).setAddress(forward).setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
|
||||
session.close();
|
||||
|
||||
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName(bridgeName)
|
||||
|
@ -179,16 +178,26 @@ public class ConfigChangeTest extends ActiveMQTestBase {
|
|||
server.getActiveMQServerControl().addConnector("connector2", "tcp://localhost:61616");
|
||||
server.getActiveMQServerControl().createBridge(bridgeConfiguration.toJSON());
|
||||
|
||||
Assert.assertEquals(2, server.getConfiguration().getConnectorConfigurations().size());
|
||||
Assert.assertEquals(2, server.getActiveMQServerControl().getBridgeNames().length);
|
||||
server.stop();
|
||||
|
||||
// clear the in-memory connector configurations to force a reload from disk
|
||||
server.getConfiguration().getConnectorConfigurations().clear();
|
||||
|
||||
server.start();
|
||||
Assert.assertEquals(2, server.getConfiguration().getConnectorConfigurations().size());
|
||||
Assert.assertEquals(2, server.getActiveMQServerControl().getBridgeNames().length);
|
||||
|
||||
server.getActiveMQServerControl().destroyBridge(bridgeName);
|
||||
server.getActiveMQServerControl().removeConnector("connector1");
|
||||
server.getActiveMQServerControl().removeConnector("connector2");
|
||||
Assert.assertEquals(0, server.getActiveMQServerControl().getBridgeNames().length);
|
||||
Assert.assertEquals(0, server.getConfiguration().getConnectorConfigurations().size());
|
||||
server.stop();
|
||||
server.start();
|
||||
Assert.assertEquals(0, server.getActiveMQServerControl().getBridgeNames().length);
|
||||
Assert.assertEquals(0, server.getConfiguration().getConnectorConfigurations().size());
|
||||
server.stop();
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.persistence;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ConnectorStorageTest extends StorageManagerTestBase {
|
||||
|
||||
public ConnectorStorageTest(StoreConfiguration.StoreType storeType) {
|
||||
super(storeType);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStoreConnector() throws Exception {
|
||||
final String NAME = RandomUtil.randomString();
|
||||
final String URL = RandomUtil.randomString();
|
||||
createStorage();
|
||||
|
||||
PersistedConnector connector = new PersistedConnector(NAME, URL);
|
||||
|
||||
journal.storeConnector(connector);
|
||||
|
||||
journal.stop();
|
||||
journal.start();
|
||||
|
||||
List<PersistedConnector> connectors = journal.recoverConnectors();
|
||||
|
||||
Assert.assertEquals(1, connectors.size());
|
||||
|
||||
PersistedConnector persistedConnector = connectors.get(0);
|
||||
Assert.assertEquals(NAME, persistedConnector.getName());
|
||||
Assert.assertEquals(URL, persistedConnector.getUrl());
|
||||
journal.stop();
|
||||
|
||||
journal = null;
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue