ARTEMIS-2908 - Persist Divert Configuration in Bindings journal

https://issues.apache.org/jira/browse/ARTEMIS-2908
This commit is contained in:
Andy Taylor 2020-09-21 10:58:15 +01:00 committed by Clebert Suconic
parent b689df1b4a
commit c77bf50db4
13 changed files with 435 additions and 1 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.logs.ActiveMQUtilBundle;
/**
* Helper methods to read and write from ActiveMQBuffer.
@ -155,5 +156,31 @@ public class BufferHelper {
}
}
public static int sizeOfNullableString(String s) {
if (s == null) {
return DataConstants.SIZE_BOOLEAN;
}
return DataConstants.SIZE_BOOLEAN + sizeOfString(s);
}
public static int sizeOfString(String s) {
int len = s.length();
if (len < 9) {
return DataConstants.SIZE_INT + (len * DataConstants.SIZE_SHORT);
}
// 4095 == 0xfff
if (len < 4095) {
// beware: this one has O(n) cost: look at UTF8Util::saveUTF
final int expectedEncodedUTF8Len = UTF8Util.calculateUTFSize(s);
if (expectedEncodedUTF8Len > 65535) {
throw ActiveMQUtilBundle.BUNDLE.stringTooLong(len);
}
return DataConstants.SIZE_INT + DataConstants.SIZE_SHORT + expectedEncodedUTF8Len;
}
// it seems weird but this SIZE_INT is required due to how UTF8Util is encoding UTF strings
// so this SIZE_INT is required
// perhaps we could optimize it and remove it, but that would break compatibility with older clients and journal
return DataConstants.SIZE_INT + sizeOfSimpleString(s);
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.util;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.utils.BufferHelper;
import org.junit.Assert;
import org.junit.Test;
public class BufferHelperTest {
@Test
public void testSizeOfNullableString() {
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(0);
String[] tests = new String[]{"111111111", new String(new byte[4094]), new String(new byte[4095])};
for (String s : tests) {
buffer.resetReaderIndex();
buffer.resetWriterIndex();
buffer.writeNullableString(s);
int size = BufferHelper.sizeOfNullableString(s);
int written = buffer.writerIndex();
Assert.assertEquals(written, size);
String readString = buffer.readNullableString();
Assert.assertEquals(s, readString);
}
}
}

View File

@ -17,12 +17,17 @@
package org.apache.activemq.artemis.core.config;
import java.io.Serializable;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.utils.BufferHelper;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.UUIDGenerator;
public class DivertConfiguration implements Serializable {
public class DivertConfiguration implements Serializable, EncodingSupport {
private static final long serialVersionUID = 6910543740464269629L;
@ -208,4 +213,69 @@ public class DivertConfiguration implements Serializable {
return false;
return true;
}
@Override
public int getEncodeSize() {
int transformerSize = 0;
if (transformerConfiguration != null) {
transformerSize += BufferHelper.sizeOfNullableString(transformerConfiguration.getClassName());
transformerSize += DataConstants.INT;
Map<String, String> properties = transformerConfiguration.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
transformerSize += BufferHelper.sizeOfNullableString(entry.getKey());
transformerSize += BufferHelper.sizeOfNullableString(entry.getValue());
}
}
int size = BufferHelper.sizeOfNullableString(name) +
BufferHelper.sizeOfNullableString(address) +
BufferHelper.sizeOfNullableString(forwardingAddress) +
BufferHelper.sizeOfNullableString(routingName) +
BufferHelper.sizeOfNullableBoolean(exclusive) +
BufferHelper.sizeOfNullableString(filterString) +
DataConstants.SIZE_BYTE + transformerSize;
return size;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeNullableString(name);
buffer.writeNullableString(address);
buffer.writeNullableString(forwardingAddress);
buffer.writeNullableString(routingName);
buffer.writeBoolean(exclusive);
buffer.writeNullableString(filterString);
buffer.writeByte(routingType != null ? routingType.getType() : ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType()).getType());
if (transformerConfiguration != null) {
buffer.writeString(transformerConfiguration.getClassName());
Map<String, String> properties = transformerConfiguration.getProperties();
buffer.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
buffer.writeNullableString(entry.getKey());
buffer.writeNullableString(entry.getValue());
}
} else {
buffer.writeNullableString(null);
}
}
@Override
public void decode(ActiveMQBuffer buffer) {
name = buffer.readNullableString();
address = buffer.readNullableString();
forwardingAddress = buffer.readNullableString();
routingName = buffer.readNullableString();
exclusive = buffer.readBoolean();
filterString = buffer.readNullableString();
routingType = ComponentConfigurationRoutingType.getType(buffer.readByte());
String transformerClassName = buffer.readNullableString();
if (transformerClassName != null) {
transformerConfiguration = new TransformerConfiguration(transformerClassName);
int propsSize = buffer.readInt();
for (int i = 0; i < propsSize; i++) {
transformerConfiguration.getProperties().put(buffer.readNullableString(), buffer.readNullableString());
}
}
}
}

View File

@ -82,6 +82,7 @@ import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -3535,6 +3536,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType));
server.deployDivert(config);
storageManager.storeDivertConfiguration(new PersistedDivertConfiguration(config));
} finally {
blockOnIO();
}

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -359,6 +360,11 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
List<PersistedRoles> recoverPersistedRoles() throws Exception;
void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception;
void deleteDivertConfiguration(String divertName) throws Exception;
List<PersistedDivertConfiguration> recoverDivertConfigurations();
/**
* @return The ID with the stored counter
*/

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.config;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
public class PersistedDivertConfiguration implements EncodingSupport {
private long storeId;
private DivertConfiguration divertConfiguration;
public PersistedDivertConfiguration(DivertConfiguration divertConfiguration) {
this.divertConfiguration = divertConfiguration;
}
public PersistedDivertConfiguration() {
divertConfiguration = new DivertConfiguration();
}
public void setStoreId(long id) {
this.storeId = id;
}
public long getStoreId() {
return storeId;
}
@Override
public int getEncodeSize() {
return divertConfiguration.getEncodeSize();
}
@Override
public void encode(ActiveMQBuffer buffer) {
divertConfiguration.encode(buffer);
}
@Override
public void decode(ActiveMQBuffer buffer) {
divertConfiguration.decode(buffer);
}
public String getName() {
return divertConfiguration.getName();
}
public DivertConfiguration getDivertConfiguration() {
return divertConfiguration;
}
}

View File

@ -74,6 +74,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AddressStatusEncoding;
@ -198,6 +199,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<>();
protected final Map<String, PersistedDivertConfiguration> mapPersistedDivertConfigurations = new ConcurrentHashMap<>();
protected final ConcurrentLongHashMap<LargeServerMessage> largeMessagesToDelete = new ConcurrentLongHashMap<>();
public AbstractJournalStorageManager(final Configuration config,
@ -783,6 +786,38 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
}
@Override
public void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception {
deleteDivertConfiguration(persistedDivertConfiguration.getName());
readLock();
try {
final long id = idGenerator.generateID();
persistedDivertConfiguration.setStoreId(id);
bindingsJournal.appendAddRecord(id, JournalRecordIds.DIVERT_RECORD, persistedDivertConfiguration, true);
mapPersistedDivertConfigurations.put(persistedDivertConfiguration.getName(), persistedDivertConfiguration);
} finally {
readUnLock();
}
}
@Override
public void deleteDivertConfiguration(String divertName) throws Exception {
PersistedDivertConfiguration oldDivert = mapPersistedDivertConfigurations.remove(divertName);
if (oldDivert != null) {
readLock();
try {
bindingsJournal.appendDeleteRecord(oldDivert.getStoreId(), false);
} finally {
readUnLock();
}
}
}
@Override
public List<PersistedDivertConfiguration> recoverDivertConfigurations() {
return new ArrayList<>(mapPersistedDivertConfigurations.values());
}
@Override
public void storeID(final long journalID, final long id) throws Exception {
readLock();
@ -1548,6 +1583,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
ActiveMQServerLogger.LOGGER.infoNoAddressWithID(statusEncoding.getAddressId(), statusEncoding.getId());
this.deleteAddressStatus(statusEncoding.getId());
}
} else if (rec == JournalRecordIds.DIVERT_RECORD) {
PersistedDivertConfiguration divertConfiguration = newDivertEncoding(id, buffer);
mapPersistedDivertConfigurations.put(divertConfiguration.getName(), divertConfiguration);
} else {
// unlikely to happen
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec));
@ -2030,6 +2068,12 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return addressStatus;
}
static PersistedDivertConfiguration newDivertEncoding(long id, ActiveMQBuffer buffer) {
PersistedDivertConfiguration persistedDivertConfiguration = new PersistedDivertConfiguration();
persistedDivertConfiguration.decode(buffer);
persistedDivertConfiguration.setStoreId(id);
return persistedDivertConfiguration;
}
/**
* @param id
* @param buffer

View File

@ -47,6 +47,8 @@ public final class JournalRecordIds {
public static final byte SECURITY_RECORD = 26;
public static final byte DIVERT_RECORD = 27;
// Message journal record types
/**

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -441,6 +442,19 @@ public class NullStorageManager implements StorageManager {
return Collections.emptyList();
}
@Override
public void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception {
}
@Override
public void deleteDivertConfiguration(String divertName) throws Exception {
}
@Override
public List<PersistedDivertConfiguration> recoverDivertConfigurations() {
return null;
}
@Override
public void storeSecurityRoles(final PersistedRoles persistedRoles) throws Exception {
}

View File

@ -93,6 +93,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.persistence.impl.journal.JDBCJournalStorageManager;
@ -3364,6 +3365,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
securityRepository.addMatch(roleItem.getAddressMatch().toString(), setRoles);
}
List<PersistedDivertConfiguration> persistedDivertConfigurations = storageManager.recoverDivertConfigurations();
if (persistedDivertConfigurations != null) {
for (PersistedDivertConfiguration persistedDivertConfiguration : persistedDivertConfigurations) {
configuration.getDivertConfigurations().add(persistedDivertConfiguration.getDivertConfiguration());
}
}
}
@Override

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -610,6 +611,21 @@ public class TransactionImplTest extends ActiveMQTestBase {
return null;
}
@Override
public void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception {
}
@Override
public void deleteDivertConfiguration(String divertName) throws Exception {
}
@Override
public List<PersistedDivertConfiguration> recoverDivertConfigurations() {
return null;
}
@Override
public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
return 0;

View File

@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -707,6 +708,21 @@ public class SendAckFailTest extends SpawnedTestBase {
return manager.recoverPersistedRoles();
}
@Override
public void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception {
}
@Override
public void deleteDivertConfiguration(String divertName) throws Exception {
}
@Override
public List<PersistedDivertConfiguration> recoverDivertConfigurations() {
return null;
}
@Override
public long storePageCounter(long txID, long queueID, long value, long size) throws Exception {
return manager.storePageCounter(txID, queueID, value, size);

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class DivertConfigurationStorageTest extends StorageManagerTestBase {
public DivertConfigurationStorageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
@Test
public void testStoreDivertConfiguration() throws Exception {
createStorage();
DivertConfiguration configuration = new DivertConfiguration();
configuration.setName("name");
configuration.setAddress("address");
configuration.setExclusive(true);
configuration.setForwardingAddress("forward");
configuration.setRoutingName("routiingName");
TransformerConfiguration mytransformer = new TransformerConfiguration("mytransformer");
mytransformer.getProperties().put("key1", "prop1");
mytransformer.getProperties().put("key2", "prop2");
mytransformer.getProperties().put("key3", "prop3");
configuration.setTransformerConfiguration(mytransformer);
journal.storeDivertConfiguration(new PersistedDivertConfiguration(configuration));
journal.stop();
journal.start();
List<PersistedDivertConfiguration> divertConfigurations = journal.recoverDivertConfigurations();
Assert.assertEquals(1, divertConfigurations.size());
PersistedDivertConfiguration persistedDivertConfiguration = divertConfigurations.get(0);
Assert.assertEquals(configuration.getName(), persistedDivertConfiguration.getDivertConfiguration().getName());
Assert.assertEquals(configuration.getAddress(), persistedDivertConfiguration.getDivertConfiguration().getAddress());
Assert.assertEquals(configuration.isExclusive(), persistedDivertConfiguration.getDivertConfiguration().isExclusive());
Assert.assertEquals(configuration.getForwardingAddress(), persistedDivertConfiguration.getDivertConfiguration().getForwardingAddress());
Assert.assertEquals(configuration.getRoutingName(), persistedDivertConfiguration.getDivertConfiguration().getRoutingName());
Assert.assertNotNull(persistedDivertConfiguration.getDivertConfiguration().getTransformerConfiguration());
Assert.assertEquals("mytransformer", persistedDivertConfiguration.getDivertConfiguration().getTransformerConfiguration().getClassName());
Map<String, String> properties = persistedDivertConfiguration.getDivertConfiguration().getTransformerConfiguration().getProperties();
Assert.assertEquals(3, properties.size());
Assert.assertEquals("prop1", properties.get("key1"));
Assert.assertEquals("prop2", properties.get("key2"));
Assert.assertEquals("prop3", properties.get("key3"));
journal.stop();
journal = null;
}
@Test
public void testStoreDivertConfigurationNoTransformer() throws Exception {
createStorage();
DivertConfiguration configuration = new DivertConfiguration();
configuration.setName("name");
configuration.setAddress("address");
configuration.setExclusive(true);
configuration.setForwardingAddress("forward");
configuration.setRoutingName("routiingName");
journal.storeDivertConfiguration(new PersistedDivertConfiguration(configuration));
journal.stop();
journal.start();
List<PersistedDivertConfiguration> divertConfigurations = journal.recoverDivertConfigurations();
Assert.assertEquals(1, divertConfigurations.size());
PersistedDivertConfiguration persistedDivertConfiguration = divertConfigurations.get(0);
Assert.assertEquals(configuration.getName(), persistedDivertConfiguration.getDivertConfiguration().getName());
Assert.assertEquals(configuration.getAddress(), persistedDivertConfiguration.getDivertConfiguration().getAddress());
Assert.assertEquals(configuration.isExclusive(), persistedDivertConfiguration.getDivertConfiguration().isExclusive());
Assert.assertEquals(configuration.getForwardingAddress(), persistedDivertConfiguration.getDivertConfiguration().getForwardingAddress());
Assert.assertEquals(configuration.getRoutingName(), persistedDivertConfiguration.getDivertConfiguration().getRoutingName());
Assert.assertNull(persistedDivertConfiguration.getDivertConfiguration().getTransformerConfiguration());
journal.stop();
journal = null;
}
}