ARTEMIS-4651 Performance improvements on Mirror and Paging

This commit is contained in:
Clebert Suconic 2024-02-02 16:48:45 -05:00 committed by clebertsuconic
parent f4b59c9b25
commit db94b18b73
53 changed files with 2858 additions and 650 deletions

View File

@ -268,6 +268,10 @@ public interface Message {
* Return null if not scheduled. */
Long getScheduledDeliveryTime();
void setPaged();
boolean isPaged();
default Message setScheduledDeliveryTime(Long time) {
return this;
}

View File

@ -123,9 +123,13 @@ public final class ManagementHelper {
ClientMessage reply = requestor.request(message);
if (ManagementHelper.hasOperationSucceeded(reply)) {
ok.accept(reply);
if (ok != null) {
ok.accept(reply);
}
} else {
failed.accept(reply);
if (failed != null) {
failed.accept(reply);
}
}
}

View File

@ -65,6 +65,10 @@ public class SimpleManagement implements AutoCloseable {
return this;
}
public String getUri() {
return uri;
}
@Override
public void close() throws Exception {
if (session != null) {
@ -81,6 +85,10 @@ public class SimpleManagement implements AutoCloseable {
return simpleManagementLong("broker", "getCurrentTimeMillis");
}
public void rebuildPageCounters() throws Exception {
simpleManagementVoid("broker", "rebuildPageCounters");
}
/** Simple helper for management returning a string.*/
public String simpleManagement(String resource, String method, Object... parameters) throws Exception {
AtomicReference<String> responseString = new AtomicReference<>();
@ -95,6 +103,11 @@ public class SimpleManagement implements AutoCloseable {
return responseLong.get();
}
/** Simple helper for management void calls.*/
public void simpleManagementVoid(String resource, String method, Object... parameters) throws Exception {
doManagement((m) -> setupCall(m, resource, method, parameters), null, SimpleManagement::failed);
}
public int simpleManagementInt(String resource, String method, Object... parameters) throws Exception {
AtomicInteger responseInt = new AtomicInteger();
doManagement((m) -> setupCall(m, resource, method, parameters), m -> setIntResult(m, responseInt), SimpleManagement::failed);

View File

@ -96,6 +96,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
protected boolean durable;
protected boolean paged;
/**
* GMT milliseconds at which this message expires. 0 means never expires *
*/
@ -123,6 +125,16 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
this.coreMessageObjectPools = null;
}
@Override
public void setPaged() {
this.paged = true;
}
@Override
public boolean isPaged() {
return paged;
}
@Override
public String getProtocolName() {
return ActiveMQClient.DEFAULT_CORE_PROTOCOL;

View File

@ -42,6 +42,16 @@ public class MessageInternalImpl implements MessageInternal {
return getClass().getName();
}
@Override
public void setPaged() {
message.setPaged();
}
@Override
public boolean isPaged() {
return message.isPaged();
}
public MessageInternalImpl(ICoreMessage message) {
this.message = (CoreMessage) message;
}

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.core.journal;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
public class RecordInfo {
public RecordInfo(final long id,
@ -54,6 +58,10 @@ public class RecordInfo {
public boolean replaceableUpdate;
public ActiveMQBuffer wrapData() {
return new ChannelBufferWrapper(Unpooled.wrappedBuffer(data), true);
}
public byte getUserRecordType() {
return userRecordType;
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.journal.collections;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.utils.DataConstants;
public abstract class AbstractHashMapPersister<K, V> implements Persister<JournalHashMap.MapRecord<K, V>> {
private final byte VERSION = 0;
@Override
public byte getID() {
return 0;
}
@Override
public final int getEncodeSize(JournalHashMap.MapRecord<K, V> record) {
return DataConstants.SIZE_LONG + // recordID
DataConstants.SIZE_BYTE + // Version
DataConstants.SIZE_LONG + // collectionID
getKeySize(record.key) +
getValueSize(record.value);
}
protected abstract int getKeySize(K key);
protected abstract void encodeKey(ActiveMQBuffer buffer, K key);
protected abstract K decodeKey(ActiveMQBuffer buffer);
protected abstract int getValueSize(V value);
protected abstract void encodeValue(ActiveMQBuffer buffer, V value);
protected abstract V decodeValue(ActiveMQBuffer buffer, K key);
@Override
public final void encode(ActiveMQBuffer buffer, JournalHashMap.MapRecord<K, V> record) {
buffer.writeLong(record.id);
buffer.writeByte(VERSION);
buffer.writeLong(record.collectionID);
encodeKey(buffer, record.key);
encodeValue(buffer, record.value);
}
@Override
public final JournalHashMap.MapRecord<K, V> decode(ActiveMQBuffer buffer,
JournalHashMap.MapRecord<K, V> record,
CoreMessageObjectPools pool) {
long id = buffer.readLong();
byte version = buffer.readByte();
assert version == VERSION;
long collectionID = buffer.readLong();
K key = decodeKey(buffer);
V value = decodeValue(buffer, key);
JournalHashMap.MapRecord<K, V> mapRecord = new JournalHashMap.MapRecord<>(collectionID, id, key, value);
return mapRecord;
}
}

View File

@ -0,0 +1,306 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.journal.collections;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JournalHashMap<K, V, C> implements Map<K, V> {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static class MapRecord<K, V> implements Entry<K, V> {
final long collectionID;
final long id;
final K key;
V value;
MapRecord(long collectionID, long id, K key, V value) {
this.collectionID = collectionID;
this.id = id;
this.key = key;
this.value = value;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public V setValue(V value) {
V oldValue = this.value;
this.value = value;
return oldValue;
}
@Override
public String toString() {
return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + ", key=" + key + ", value=" + value + '}';
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
MapRecord<?, ?> mapRecord = (MapRecord<?, ?>) o;
if (collectionID != mapRecord.collectionID)
return false;
if (id != mapRecord.id)
return false;
if (!Objects.equals(key, mapRecord.key))
return false;
return Objects.equals(value, mapRecord.value);
}
@Override
public int hashCode() {
int result = (int) (collectionID ^ (collectionID >>> 32));
result = 31 * result + (int) (id ^ (id >>> 32));
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
}
public JournalHashMap(long collectionId, Journal journal, LongSupplier idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, IOCriticalErrorListener ioExceptionListener) {
this.collectionId = collectionId;
this.journal = journal;
this.idGenerator = idGenerator;
this.persister = persister;
this.recordType = recordType;
this.exceptionListener = ioExceptionListener;
this.completionSupplier = completionSupplier;
this.contextProvider = contextProvider;
}
C context;
LongFunction<C> contextProvider;
private final Persister<MapRecord<K, V>> persister;
private final Journal journal;
private final long collectionId;
private final byte recordType;
private final LongSupplier idGenerator;
private final Supplier<IOCompletion> completionSupplier;
private final IOCriticalErrorListener exceptionListener;
private final Map<K, MapRecord<K, V>> map = new HashMap<>();
public long getCollectionId() {
return collectionId;
}
@Override
public synchronized int size() {
return map.size();
}
public C getContext() {
if (context == null && contextProvider != null) {
context = contextProvider.apply(this.collectionId);
}
return context;
}
public JournalHashMap<K, V, C> setContext(C context) {
this.context = context;
return this;
}
@Override
public synchronized boolean isEmpty() {
return map.isEmpty();
}
@Override
public synchronized boolean containsKey(Object key) {
return map.containsKey(key);
}
@Override
public synchronized boolean containsValue(Object value) {
for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
if (value.equals(entry.getValue().value)) {
return true;
}
}
return false;
}
@Override
public synchronized V get(Object key) {
MapRecord<K, V> record = map.get(key);
if (record == null) {
return null;
} else {
return record.value;
}
}
/** This is to be called from a single thread during reload, no need to be synchronized */
public void reload(MapRecord<K, V> reloadValue) {
map.put(reloadValue.getKey(), reloadValue);
}
@Override
public synchronized V put(K key, V value) {
logger.debug("adding {} = {}", key, value);
long id = idGenerator.getAsLong();
MapRecord<K, V> record = new MapRecord<>(collectionId, id, key, value);
store(record);
MapRecord<K, V> oldRecord = map.put(key, record);
if (oldRecord != null) {
removed(oldRecord);
return oldRecord.value;
} else {
return null;
}
}
private synchronized void store(MapRecord<K, V> record) {
try {
IOCompletion callback = null;
if (completionSupplier != null) {
callback = completionSupplier.get();
}
if (callback == null) {
journal.appendAddRecord(record.id, recordType, persister, record, false);
} else {
journal.appendAddRecord(record.id, recordType, persister, record, true, callback);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
exceptionListener.onIOException(e, e.getMessage(), null);
}
}
// callers must be synchronized
private void removed(MapRecord<K, V> record) {
try {
journal.appendDeleteRecord(record.id, false);
} catch (Exception e) {
exceptionListener.onIOException(e, e.getMessage(), null);
}
}
// callers must be synchronized
private void removed(MapRecord<K, V> record, long txid) {
try {
journal.appendDeleteRecordTransactional(txid, record.id);
} catch (Exception e) {
exceptionListener.onIOException(e, e.getMessage(), null);
}
}
@Override
public synchronized V remove(Object key) {
MapRecord<K, V> record = map.remove(key);
this.removed(record);
return record.value;
}
/** This method will remove the element from the HashMap immediately however the record is still part of a transaction.
* This is not playing with rollbacks. So a rollback on the transaction wouldn't place the elements back.
* This is intended to make sure the operation would be atomic in case of a failure, while an appendRollback is not expected. */
public synchronized V remove(Object key, long transactionID) {
MapRecord<K, V> record = map.remove(key);
this.removed(record, transactionID);
return record.value;
}
@Override
public synchronized void putAll(Map<? extends K, ? extends V> m) {
m.forEach(this::put);
}
@Override
public synchronized void clear() {
map.values().forEach(this::remove);
map.clear();
}
@Override
public Set<K> keySet() {
return map.keySet();
}
/** Not implemented yet, you may use valuesCopy.*/
@Override
public Collection<V> values() {
throw new UnsupportedOperationException("not implemented yet. You may use valuesCopy");
}
public synchronized Collection<V> valuesCopy() {
ArrayList<V> values = new ArrayList<>(map.size());
map.values().forEach(v -> values.add(v.value));
return values;
}
/** Not implemented yet, you may use entrySetCoy */
@Override
public synchronized Set<Entry<K, V>> entrySet() {
throw new UnsupportedOperationException("not implemented yet. You may use entrySetCopy");
}
public synchronized Set<Entry<K, V>> entrySetCopy() {
return new HashSet<>(map.values());
}
@Override
public synchronized void forEach(BiConsumer<? super K, ? super V> action) {
Objects.requireNonNull(action);
map.forEach((a, b) -> action.accept(b.key, b.value));
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.journal.collections;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.persistence.Persister;
public class JournalHashMapProvider<K, V, C> {
final Journal journal;
final Persister<JournalHashMap.MapRecord<K, V>> persister;
final LongObjectHashMap<JournalHashMap<K, V, C>> journalMaps = new LongObjectHashMap<>();
final LongSupplier idSupplier;
final byte recordType;
final IOCriticalErrorListener ioExceptionListener;
final Supplier<IOCompletion> ioCompletionSupplier;
final LongFunction<C> contextProvider;
public JournalHashMapProvider(LongSupplier idSupplier, Journal journal, AbstractHashMapPersister<K, V> persister, byte recordType, Supplier<IOCompletion> ioCompletionSupplier, LongFunction<C> contextProvider, IOCriticalErrorListener ioExceptionListener) {
this.idSupplier = idSupplier;
this.persister = persister;
this.journal = journal;
this.recordType = recordType;
this.ioExceptionListener = ioExceptionListener;
this.contextProvider = contextProvider;
this.ioCompletionSupplier = ioCompletionSupplier;
}
public List<JournalHashMap<K, V, C>> getMaps() {
ArrayList<JournalHashMap<K, V, C>> maps = new ArrayList<>();
journalMaps.values().forEach(maps::add);
return maps;
}
public void clear() {
journalMaps.clear();
}
public void reload(RecordInfo recordInfo) {
JournalHashMap.MapRecord<K, V> mapRecord = persister.decode(recordInfo.wrapData(), null, null);
getMap(mapRecord.collectionID, null).reload(mapRecord);
}
public Iterator<JournalHashMap<K, V, C>> iterMaps() {
return journalMaps.values().iterator();
}
public synchronized JournalHashMap<K, V, C> getMap(long collectionID, C context) {
JournalHashMap<K, V, C> journalHashMap = journalMaps.get(collectionID);
if (journalHashMap == null) {
journalHashMap = new JournalHashMap<>(collectionID, journal, idSupplier, persister, recordType, ioCompletionSupplier, contextProvider, ioExceptionListener).setContext(context);
journalMaps.put(collectionID, journalHashMap);
}
return journalHashMap;
}
public JournalHashMap<K, V, C> getMap(long collectionID) {
return getMap(collectionID, null);
}
}

View File

@ -434,6 +434,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
@Override
public void setPaged() {
super.setPaged();
largeBody.setPaged();
}

View File

@ -208,6 +208,8 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected boolean expirationReload = false;
protected long scheduledTime = -1;
protected boolean isPaged;
// The Proton based AMQP message section that are retained in memory, these are the
// mutable portions of the Message as the broker sees it, although AMQP defines that
// the Properties and ApplicationProperties are immutable so care should be taken
@ -289,6 +291,16 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return MessageDataScanningStatus.valueOf(messageDataScanned);
}
@Override
public boolean isPaged() {
return isPaged;
}
@Override
public void setPaged() {
isPaged = true;
}
/** This will return application properties without attempting to decode it.
* That means, if applicationProperties were never parsed before, this will return null, even if there is application properties.
* This was created as an internal method for testing, as we need to validate if the application properties are not decoded until needed. */

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
@ -74,6 +75,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
private final ActiveMQServer server;
private AckManager ackRetryManager;
private ReferenceIDSupplier referenceIDSupplier;
private final ProtonProtocolManagerFactory factory;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
@ -25,15 +26,21 @@ import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.osgi.service.component.annotations.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(service = ProtocolManagerFactory.class)
public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<AmqpInterceptor> {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String AMQP_PROTOCOL_NAME = "AMQP";
private static final String MODULE_NAME = "artemis-amqp-protocol";
@ -79,6 +86,12 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
*/
@Override
public void loadProtocolServices(ActiveMQServer server, List<ActiveMQComponent> services) {
try {
AckManager ackManager = AckManagerProvider.getManager(server, false);
server.registerRecordsLoader(ackManager::reload);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
final List<AMQPBrokerConnectConfiguration> amqpServicesConfigurations = server.getConfiguration().getAMQPConnection();
if (amqpServicesConfigurations != null && amqpServicesConfigurations.size() > 0) {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -33,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@ -40,6 +42,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
@ -102,6 +105,8 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
private final AMQPBrokerConnection brokerConnection;
private final boolean sync;
private final PagedRouteContext pagedRouteContext;
final AMQPMirrorBrokerConnectionElement replicaConfig;
boolean started;
@ -122,6 +127,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
AMQPBrokerConnection brokerConnection) {
super(server);
assert snfQueue != null;
this.replicaConfig = replicaConfig;
this.snfQueue = snfQueue;
this.server = server;
@ -132,6 +138,15 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
this.acks = replicaConfig.isMessageAcknowledgements();
this.brokerConnection = brokerConnection;
this.sync = replicaConfig.isSync();
this.pagedRouteContext = new PagedRouteContext(snfQueue);
if (sync) {
logger.debug("Mirror is configured to sync, so pageStore={} being enforced to BLOCK, and not page", snfQueue.getName());
snfQueue.getPagingStore().enforceAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
} else {
logger.debug("Mirror is configured to not sync, so pageStore={} being enforced to PAGE", snfQueue.getName());
snfQueue.getPagingStore().enforceAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
}
}
public Queue getSnfQueue() {
@ -250,6 +265,19 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return (remoteID != null && sourceID != null && remoteID.equals(sourceID));
}
Message copyMessageForPaging(Message message) {
long newID = server.getStorageManager().generateID();
long originalID = message.getMessageID();
if (logger.isTraceEnabled()) {
logger.trace("copying message {} as {}", originalID, newID);
}
message = message.copy(newID, false);
message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, originalID);
return message;
}
@Override
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
SimpleString address = context.getAddress(message);
@ -281,6 +309,20 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return;
}
// This will store the message on paging, and the message will be copied into paging.
if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, this::copyMessageForPaging)) {
return;
}
if (message.isPaged()) {
// if the source was paged, we copy the message
// this is because the ACK will happen on different queues.
// We can only use additional references on the queue when not in page mode.
// otherwise it must be a copy
// this will also work better with large messages
message = copyMessageForPaging(message);
}
MessageReference ref = MessageReference.Factory.createReference(message, snfQueue);
setProtocolData(ref, nodeID, idSupplier.getID(ref), context);
@ -347,6 +389,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
logger.trace("validating protocol data, adding protocol data for {}", ref);
setProtocolData(referenceIDSupplier, ref);
}
}
@ -419,11 +462,17 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
@Override
public void preAcknowledge(final Transaction tx, final MessageReference ref, final AckReason reason) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("postACKInternalMessage::tx={}, ref={}, reason={}", tx, ref, reason);
logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason);
}
MirrorController controllerInUse = getControllerInUse();
// Retried ACKs are not forwarded.
// This is because they were already confirmed and stored for later ACK which would be happening now
if (controllerInUse != null && controllerInUse.isRetryACK()) {
return;
}
if (!acks || ref.getQueue().isMirrorController()) { // we don't call preAcknowledge on snfqueues, otherwise we would get infinite loop because of this feedback/
return;
}
@ -617,4 +666,55 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
server.getPostOffice().route(message, ctx, false);
}
static class PagedRouteContext implements RouteContextList {
private final List<Queue> durableQueues;
private final List<Queue> nonDurableQueues;
PagedRouteContext(Queue snfQueue) {
ArrayList<Queue> queues = new ArrayList<>(1);
queues.add(snfQueue);
if (snfQueue.isDurable()) {
durableQueues = queues;
nonDurableQueues = Collections.emptyList();
} else {
durableQueues = Collections.emptyList();
nonDurableQueues = queues;
}
}
@Override
public int getNumberOfNonDurableQueues() {
return nonDurableQueues.size();
}
@Override
public int getNumberOfDurableQueues() {
return durableQueues.size();
}
@Override
public List<Queue> getDurableQueues() {
return durableQueues;
}
@Override
public List<Queue> getNonDurableQueues() {
return nonDurableQueues;
}
@Override
public void addAckedQueue(Queue queue) {
}
@Override
public boolean isAlreadyAcked(Queue queue) {
return false;
}
}
}

View File

@ -17,8 +17,6 @@
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.Collection;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
@ -26,8 +24,6 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.RunnableCallback;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
@ -176,6 +172,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
private MessageReader coreLargeMessageReader;
private AckManager ackManager;
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
AMQPSessionContext protonSession,
@ -243,10 +241,13 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
} else if (eventType.equals(POST_ACK)) {
String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, BROKER_ID);
logger.trace("ACK towards NodeID = {}, while the localNodeID={}", nodeID, server.getNodeID());
AckReason ackReason = AMQPMessageBrokerAccessor.getMessageAnnotationAckReason(amqpMessage);
if (nodeID == null) {
nodeID = getRemoteMirrorId(); // not sending the nodeID means it's data generated on that broker
logger.trace("Replacing nodeID by {}", nodeID);
}
String queueName = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(amqpMessage, QUEUE);
AmqpValue value = (AmqpValue) amqpMessage.getBody();
@ -392,72 +393,26 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
server.getIdentity(), queue, messageID, messageID, targetQueue);
}
performAck(nodeID, messageID, targetQueue, ackMessage, reason, (short)0);
performAck(nodeID, targetQueue, messageID, ackMessage, reason);
return true;
}
public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) {
PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation);
targetQueue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck);
}
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
private void performAck(String nodeID,
Queue targetQueue,
long messageID,
ACKMessageOperation ackMessageOperation, AckReason reason) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", nodeID, messageID, targetQueue.getName());
}
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (reference == null) {
if (logger.isDebugEnabled()) {
logger.debug("Retrying Reference not found on messageID={}, nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry);
}
switch (retry) {
case 0:
// first retry, after IO Operations
sessionSPI.getSessionContext().executeOnCompletion(new RunnableCallback(() -> performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short) 1)));
return;
case 1:
// second retry after the queue is flushed the temporary adds
targetQueue.flushOnIntermediate(() -> {
recoverContext();
performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short)2);
});
return;
case 2:
// third retry, on paging
if (reason != AckReason.EXPIRED) {
// if expired, we don't need to check on paging
// as the message will expire again when depaged (if on paging)
performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
return;
} else {
connection.runNow(ackMessageOperation);
}
}
if (ackManager == null) {
ackManager = AckManagerProvider.getManager(server, true);
}
if (reference != null) {
if (logger.isTraceEnabled()) {
logger.trace("Post ack Server {} worked well for messageID={} nodeID={} queue={}, targetQueue={}", server, messageID, nodeID, reference.getQueue(), targetQueue);
}
try {
switch (reason) {
case EXPIRED:
targetQueue.expire(reference, null, false);
break;
default:
TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true);
targetQueue.acknowledge(transaction, reference, reason, null, false);
transaction.commit();
break;
}
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
ackManager.ack(nodeID, targetQueue, messageID, reason, true);
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
}
/**
@ -565,68 +520,4 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
// Do nothing
}
class PageAck implements ToIntFunction<PagedReference>, BooleanSupplier, Runnable {
final Queue targetQueue;
final String nodeID;
final long messageID;
final IOCallback operation;
PageAck(Queue targetQueue, String nodeID, long messageID, IOCallback operation) {
this.targetQueue = targetQueue;
this.nodeID = nodeID;
this.messageID = messageID;
this.operation = operation;
}
/**
* Method to retry the ack before a scan
*/
@Override
public boolean getAsBoolean() {
try {
recoverContext();
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (reference == null) {
return false;
} else {
TransactionImpl tx = new TransactionImpl(server.getStorageManager()).setAsync(true);
targetQueue.acknowledge(tx, reference, AckReason.NORMAL, null, false);
tx.commit();
OperationContextImpl.getContext().executeOnCompletion(operation);
return true;
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
return false;
}
}
@Override
public int applyAsInt(PagedReference reference) {
String refNodeID = referenceNodeStore.getServerID(reference);
long refMessageID = referenceNodeStore.getID(reference);
if (refNodeID == null) {
refNodeID = referenceNodeStore.getDefaultNodeID();
}
if (refNodeID.equals(nodeID)) {
long diff = refMessageID - messageID;
if (diff == 0) {
return 0;
} else if (diff > 0) {
return 1;
} else {
return -1;
}
} else {
return -1;
}
}
@Override
public void run() {
operation.done();
}
}
}

View File

@ -0,0 +1,516 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AckManager implements ActiveMQComponent {
// we first retry on the queue a few times
private static final short MIN_QUEUE_ATTEMPTS = Short.parseShort(System.getProperty(AckRetry.class.getName() + ".MIN_QUEUE_ATTEMPTS", "5"));
private static final short MAX_PAGE_ATTEMPTS = Short.parseShort(System.getProperty(AckRetry.class.getName() + ".MAX_PAGE_ATTEMPT", "2"));
public static final int RETRY_DELAY = Integer.parseInt(System.getProperty(AckRetry.class.getName() + ".RETRY_DELAY", "100"));
private static DisabledAckMirrorController disabledAckMirrorController = new DisabledAckMirrorController();
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final Journal journal;
final LongSupplier sequenceGenerator;
final JournalHashMapProvider<AckRetry, AckRetry, Queue> journalHashMapProvider;
final ActiveMQServer server;
final ReferenceIDSupplier referenceIDSupplier;
final IOCriticalErrorListener ioCriticalErrorListener;
volatile MultiStepProgress progress;
ActiveMQScheduledComponent scheduledComponent;
public AckManager(ActiveMQServer server) {
this.server = server;
this.ioCriticalErrorListener = server.getIoCriticalErrorListener();
this.journal = server.getStorageManager().getMessageJournal();
this.sequenceGenerator = server.getStorageManager()::generateID;
journalHashMapProvider = new JournalHashMapProvider<>(sequenceGenerator, journal, AckRetry.getPersister(), JournalRecordIds.ACK_RETRY, OperationContextImpl::getContext, server.getPostOffice()::findQueue, server.getIoCriticalErrorListener());
this.referenceIDSupplier = new ReferenceIDSupplier(server);
}
public void reload(RecordInfo recordInfo) {
journalHashMapProvider.reload(recordInfo);
}
@Override
public synchronized void stop() {
if (scheduledComponent != null) {
scheduledComponent.stop();
scheduledComponent = null;
}
AckManagerProvider.remove(this.server);
logger.debug("Stopping ackmanager on server {}", server);
}
@Override
public synchronized boolean isStarted() {
return scheduledComponent != null && scheduledComponent.isStarted();
}
@Override
public synchronized void start() {
logger.debug("Starting ACKManager on {} with period = {}", server, RETRY_DELAY);
if (!isStarted()) {
scheduledComponent = new ActiveMQScheduledComponent(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), RETRY_DELAY, RETRY_DELAY, TimeUnit.MILLISECONDS, true) {
@Override
public void run() {
beginRetry();
}
};
scheduledComponent.start();
scheduledComponent.delay();
} else {
logger.debug("Starting ignored on server {}", server);
}
}
public void beginRetry() {
logger.trace("being retry server {}", server);
if (initRetry()) {
logger.trace("Starting process to retry, server={}", server);
progress.nextStep();
} else {
logger.trace("Retry already happened");
}
}
public void endRetry() {
logger.trace("Retry done on server {}", server);
progress = null;
// schedule a retry
if (!sortRetries().isEmpty()) {
scheduledComponent.delay();
}
}
public boolean initRetry() {
if (progress != null) {
logger.trace("Retry already in progress, we will wait next time, server={}", server);
return false;
}
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retries = sortRetries();
if (retries.isEmpty()) {
logger.trace("Nothing to retry!, server={}", server);
return false;
}
progress = new MultiStepProgress(sortRetries());
return true;
}
// Sort the ACK list by address
// We have the retries by queue, we need to sort them by address
// as we will perform all the retries on the same addresses at the same time (in the Multicast case with multiple queues acking)
public HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> sortRetries() {
// We will group the retries by address,
// so we perform all of the queues in the same address at once
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retriesByAddress = new HashMap<>();
Iterator<JournalHashMap<AckRetry, AckRetry, Queue>> queueRetriesIterator = journalHashMapProvider.getMaps().iterator();
while (queueRetriesIterator.hasNext()) {
JournalHashMap<AckRetry, AckRetry, Queue> ackRetries = queueRetriesIterator.next();
if (!ackRetries.isEmpty()) {
Queue queue = ackRetries.getContext();
if (queue != null) {
SimpleString address = queue.getAddress();
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queueRetriesOnAddress = retriesByAddress.get(address);
if (queueRetriesOnAddress == null) {
queueRetriesOnAddress = new LongObjectHashMap<>();
retriesByAddress.put(address, queueRetriesOnAddress);
}
queueRetriesOnAddress.put(queue.getID(), ackRetries);
}
}
}
return retriesByAddress;
}
private boolean isEmpty(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
AtomicBoolean empty = new AtomicBoolean(true);
queuesToRetry.forEach((id, journalHashMap) -> {
if (!journalHashMap.isEmpty()) {
empty.set(false);
}
});
return empty.get();
}
// to be used with the same executor as the PagingStore executor
public boolean retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
boolean retriedPaging = false;
logger.debug("retrying address {} on server {}", address, server);
try {
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
if (checkRetriesAndPaging(queuesToRetry)) {
logger.debug("scanning paging for {}", address);
AckRetry key = new AckRetry();
PagingStore store = server.getPagingManager().getPageStore(address);
for (long pageId = store.getFirstPage(); pageId <= store.getCurrentWritingPage(); pageId++) {
if (isEmpty(queuesToRetry)) {
logger.trace("Retry stopped while reading page {} on address {} as the outcome is now empty, server={}", pageId, address, server);
break;
}
Page page = store.usePage(pageId, true, false);
if (page == null) {
continue;
}
try {
if (retryPage(queuesToRetry, page, key)) {
retriedPaging = true;
}
} finally {
page.usageDown();
}
}
validateExpiredSet(queuesToRetry);
} else {
logger.debug("Page Scan not required for address {}", address);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
} finally {
AMQPMirrorControllerTarget.setControllerInUse(previousController);
}
return retriedPaging;
}
private void validateExpiredSet(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
queuesToRetry.forEach(this::validateExpireSet);
}
private void validateExpireSet(long queueID, JournalHashMap<AckRetry, AckRetry, Queue> retries) {
for (AckRetry retry : retries.valuesCopy()) {
if (retry.getQueueAttempts() >= MIN_QUEUE_ATTEMPTS) {
if (retry.attemptedPage() >= MAX_PAGE_ATTEMPTS) {
if (logger.isDebugEnabled()) {
logger.debug("Retried {} {} times, giving up on the entry now", retry, retry.getPageAttempts());
}
retries.remove(retry);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Retry {} attempted {} times on paging", retry, retry.getPageAttempts());
}
}
}
}
}
private boolean retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry,
Page page,
AckRetry key) throws Exception {
AtomicBoolean retriedPaging = new AtomicBoolean(false);
TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true);
// scan each page for acks
page.getMessages().forEach(pagedMessage -> {
for (int i = 0; i < pagedMessage.getQueueIDs().length; i++) {
long queueID = pagedMessage.getQueueIDs()[i];
JournalHashMap<AckRetry, AckRetry, Queue> retries = queuesToRetry.get(queueID);
if (retries != null) {
String serverID = referenceIDSupplier.getServerID(pagedMessage.getMessage());
if (serverID == null) {
serverID = referenceIDSupplier.getDefaultNodeID();
}
long id = referenceIDSupplier.getID(pagedMessage.getMessage());
logger.debug("Looking for retry on serverID={}, id={} on server={}", serverID, id, server);
key.setNodeID(serverID).setMessageID(id);
AckRetry foundRetry = retries.get(key);
// we first retry messages in the queue first.
// this is to avoid messages that are in transit from being depaged into the queue
if (foundRetry != null && foundRetry.getQueueAttempts() > MIN_QUEUE_ATTEMPTS) {
Queue queue = retries.getContext();
if (queue != null) {
PageSubscription subscription = queue.getPageSubscription();
if (!subscription.isAcked(pagedMessage)) {
PagedReference reference = retries.getContext().getPagingStore().getCursorProvider().newReference(pagedMessage, subscription);
try {
subscription.ackTx(transaction, reference);
retriedPaging.set(true);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (ioCriticalErrorListener != null) {
ioCriticalErrorListener.onIOException(e, e.getMessage(), null);
}
}
}
retries.remove(foundRetry, transaction.getID());
transaction.setContainsPersistent();
logger.debug("retry found = {} for message={} on queue", foundRetry, pagedMessage);
}
}
} else {
logger.trace("Retry key={} not found server={}", key, server);
}
}
});
try {
transaction.commit();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (ioCriticalErrorListener != null) {
ioCriticalErrorListener.onIOException(e, e.getMessage(), null);
}
}
return retriedPaging.get();
}
/** returns true if there are retries ready to be scanned on paging */
private boolean checkRetriesAndPaging(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
boolean needScanOnPaging = false;
Iterator<Map.Entry<Long, JournalHashMap<AckRetry, AckRetry, Queue>>> iter = queuesToRetry.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Long, JournalHashMap<AckRetry, AckRetry, Queue>> entry = iter.next();
JournalHashMap<AckRetry, AckRetry, Queue> queueRetries = entry.getValue();
Queue queue = queueRetries.getContext();
for (AckRetry retry : queueRetries.valuesCopy()) {
if (ack(retry.getNodeID(), queue, retry.getMessageID(), retry.getReason(), false)) {
logger.debug("Removing retry {} as the retry went ok", retry);
queueRetries.remove(retry);
} else {
int retried = retry.attemptedQueue();
if (logger.isDebugEnabled()) {
logger.debug("retry {} attempted {} times on the queue", retry, retried);
}
if (retried >= MIN_QUEUE_ATTEMPTS) {
needScanOnPaging = true;
}
}
}
}
return needScanOnPaging;
}
public synchronized void addRetry(String nodeID, Queue queue, long messageID, AckReason reason) {
if (nodeID == null) {
nodeID = referenceIDSupplier.getDefaultNodeID();
}
AckRetry retry = new AckRetry(nodeID, messageID, reason);
journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry);
if (scheduledComponent != null) {
scheduledComponent.delay();
}
}
public boolean ack(String nodeID, Queue targetQueue, long messageID, AckReason reason, boolean allowRetry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}, allowRetry={})", nodeID, messageID, targetQueue.getName(), allowRetry);
}
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceIDSupplier);
if (reference == null) {
logger.debug("Could not find retry on reference nodeID={} (while localID={}), messageID={} on queue {}, server={}", nodeID, referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(), server);
if (allowRetry) {
addRetry(nodeID, targetQueue, messageID, reason);
}
return false;
} else {
if (logger.isTraceEnabled()) {
logger.trace("ack {} worked well for messageID={} nodeID={} queue={}, targetQueue={}", server, messageID, nodeID, reference.getQueue(), targetQueue);
}
doACK(targetQueue, reference, reason);
return true;
}
}
private void doACK(Queue targetQueue, MessageReference reference, AckReason reason) {
try {
switch (reason) {
case EXPIRED:
targetQueue.expire(reference, null, false);
break;
default:
TransactionImpl transaction = new TransactionImpl(server.getStorageManager()).setAsync(true);
targetQueue.acknowledge(transaction, reference, reason, null, false);
transaction.commit();
break;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
/** The ACKManager will perform the retry on each address's pageStore executor.
* it will perform each address individually, one by one. */
class MultiStepProgress {
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retryList;
Iterator<Map.Entry<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>>> retryIterator;
boolean retriedPaging = false;
MultiStepProgress(HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> retryList) {
this.retryList = retryList;
retryIterator = retryList.entrySet().iterator();
}
public void nextStep() {
try {
if (!retryIterator.hasNext()) {
if (retriedPaging) {
logger.debug("Retried acks on paging, better to rebuild the page counters");
server.getPagingManager().rebuildCounters(null);
}
logger.trace("Iterator is done on retry, server={}", server);
AckManager.this.endRetry();
} else {
Map.Entry<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> entry = retryIterator.next();
//////////////////////////////////////////////////////////////////////
// Issue a deliverAsync on each queue before doing the retries
// to make it more likely to hit the ack retry on each queue
entry.getValue().values().forEach(this::deliveryAsync);
PagingStore pagingStore = server.getPagingManager().getPageStore(entry.getKey());
pagingStore.execute(() -> {
if (AckManager.this.retryAddress(entry.getKey(), entry.getValue())) {
retriedPaging = true;
}
nextStep();
});
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
// there was an exception, I'm clearing the current progress to allow a new one
AckManager.this.endRetry();
}
}
private void deliveryAsync(JournalHashMap<AckRetry, AckRetry, Queue> map) {
Queue queue = map.getContext();
if (queue != null) {
queue.deliverAsync();
}
}
}
private static class DisabledAckMirrorController implements MirrorController {
@Override
public boolean isRetryACK() {
return true;
}
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
}
@Override
public void deleteAddress(AddressInfo addressInfo) throws Exception {
}
@Override
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
}
@Override
public void deleteQueue(SimpleString addressName, SimpleString queueName) throws Exception {
}
@Override
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) throws Exception {
}
@Override
public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
}
@Override
public String getRemoteMirrorId() {
return null;
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AckManagerProvider {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static HashMap<ActiveMQServer, AckManager> managerHashMap = new HashMap<>();
public static void reset() {
managerHashMap.clear();
}
public static void remove(ActiveMQServer server) {
logger.info("Removing {}", server);
synchronized (managerHashMap) {
managerHashMap.remove(server);
}
}
public static int getSize() {
synchronized (managerHashMap) {
return managerHashMap.size();
}
}
public static AckManager getManager(ActiveMQServer server, boolean start) {
synchronized (managerHashMap) {
AckManager ackManager = managerHashMap.get(server);
if (ackManager != null) {
if (start && !ackManager.isStarted()) {
ackManager.start();
}
return ackManager;
}
ackManager = new AckManager(server);
managerHashMap.put(server, ackManager);
try {
server.addExternalComponent(ackManager, false);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
if (start) {
ackManager.start();
}
return ackManager;
}
}
}

View File

@ -70,15 +70,15 @@ public class ReferenceIDSupplier implements NodeStoreFactory<MessageReference> {
public long getID(MessageReference element) {
Message message = element.getMessage();
Long id = getID(message);
if (id == null) {
return element.getMessageID();
} else {
return id;
}
return getID(message);
}
private Long getID(Message message) {
return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
public Long getID(Message message) {
Long messageID = (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
if (messageID == null) {
return message.getMessageID();
} else {
return messageID;
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging;
import java.io.File;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
@ -69,6 +70,10 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
AddressFullMessagePolicy getAddressFullMessagePolicy();
default PagingStore enforceAddressFullMessagePolicy(AddressFullMessagePolicy enforcedAddressFullMessagePolicy) {
return this;
}
PageFullMessagePolicy getPageFullMessagePolicy();
Long getPageLimitMessages();
@ -126,10 +131,13 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
*/
boolean page(Message message, Transaction tx, RouteContextList listCtx) throws Exception;
boolean page(Message message, Transaction tx, RouteContextList listCtx, Function<Message, Message> pageDecorator) throws Exception;
Page usePage(long page);
/** Use this method when you want to use the cache of used pages. If you are just using offline (e.g. print-data), use the newPageObject method.*/
Page usePage(long page, boolean create);
Page usePage(long page, boolean createEntry, boolean createFile);
Page newPageObject(long page) throws Exception;

View File

@ -16,9 +16,7 @@
*/
package org.apache.activemq.artemis.core.paging.cursor;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
@ -84,6 +82,8 @@ public interface PageSubscription {
boolean contains(PagedReference ref) throws Exception;
boolean isAcked(PagedMessage pagedMessage);
// for internal (cursor) classes
void confirmPosition(PagePosition ref) throws Exception;
@ -92,16 +92,6 @@ public interface PageSubscription {
// for internal (cursor) classes
void confirmPosition(Transaction tx, PagePosition position) throws Exception;
/**
* This method will schedule scanning over Paging, however a retry should be done before the scanning.
* @param retryBeforeScan if this function is called and returns true, the scan for this element will not be called. It would be caller's responsibility to call found.
* @param scanFunction
* @param found
* @param notFound
*/
void scanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound);
/**
* @return the first page in use or MAX_LONG if none is in use
*/

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -29,9 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;
import io.netty.util.collection.IntObjectHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -61,7 +60,6 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.core.server.impl.QueueImpl.DELIVERY_TIMEOUT;
@ -111,164 +109,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
private final AtomicLong deliveredSize = new AtomicLong(0);
/** this variable governs if we need to schedule another runner to look after the scanList. */
private boolean pageScanNeeded = true;
private final LinkedList<PageScan> scanList = new LinkedList();
private static class PageScan {
final BooleanSupplier retryBeforeScan;
final ToIntFunction<PagedReference> scanFunction;
final Runnable found;
final Runnable notFound;
public ToIntFunction<PagedReference> getScanFunction() {
return scanFunction;
}
public Runnable getFound() {
return found;
}
public Runnable getNotfound() {
return notFound;
}
PageScan(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
this.retryBeforeScan = retryBeforeScan;
this.scanFunction = scanFunction;
this.found = found;
this.notFound = notFound;
}
}
@Override
public void scanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
PageScan scan = new PageScan(retryBeforeScan, scanFunction, found, notFound);
boolean pageScanNeededLocal;
synchronized (scanList) {
scanList.add(scan);
pageScanNeededLocal = this.pageScanNeeded;
this.pageScanNeeded = false;
}
if (pageScanNeededLocal) {
pageStore.execute(this::performScanAck);
}
}
private void performScanAck() {
try {
PageScan[] localScanList;
synchronized (scanList) {
this.pageScanNeeded = true;
if (scanList.size() == 0) {
return;
}
localScanList = scanList.stream().toArray(i -> new PageScan[i]);
scanList.clear();
}
int retriedFound = 0;
for (int i = 0; i < localScanList.length; i++) {
PageScan scanElemen = localScanList[i];
if (scanElemen.retryBeforeScan != null && scanElemen.retryBeforeScan.getAsBoolean()) {
localScanList[i] = null;
retriedFound++;
}
}
if (retriedFound == localScanList.length) {
return;
}
if (!isPaging()) {
// this would mean that between the submit and now the system left paging mode
// at this point we will just return everything as notFound
for (int i = 0; i < localScanList.length; i++) {
PageScan scanElemen = localScanList[i];
if (scanElemen != null && scanElemen.notFound != null) {
scanElemen.notFound.run();
}
}
return;
}
LinkedList<Runnable> afterCommitList = new LinkedList<>();
TransactionImpl tx = new TransactionImpl(store);
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
for (Runnable r : afterCommitList) {
try {
r.run();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
});
PageIterator iterator = this.iterator(true);
try {
while (iterator.hasNext()) {
PagedReference reference = iterator.next();
boolean keepMoving = false;
for (int i = 0; i < localScanList.length; i++) {
PageScan scanElemen = localScanList[i];
if (scanElemen == null) {
continue;
}
int result = scanElemen.scanFunction.applyAsInt(reference);
if (result >= 0) {
if (result == 0) {
try {
PageSubscriptionImpl.this.ackTx(tx, reference);
if (scanElemen.found != null) {
afterCommitList.add(scanElemen.found);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} else {
if (scanElemen.notFound != null) {
scanElemen.notFound.run();
}
}
localScanList[i] = null;
} else {
keepMoving = true;
}
}
if (!keepMoving) {
break;
}
}
} finally {
iterator.close();
}
for (int i = 0; i < localScanList.length; i++) {
if (localScanList[i] != null && localScanList[i].notFound != null) {
localScanList[i].notFound.run();
}
localScanList[i] = null;
}
if (afterCommitList.size() > 0) {
try {
tx.commit();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,
final StorageManager store,
@ -588,7 +428,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
PageTransactionInfo txInfo = getPageTransaction(reference);
if (txInfo != null) {
txInfo.storeUpdate(store, pageStore.getPagingManager(), tx);
tx.setContainsPersistent();
}
}
@Override
@ -618,6 +460,11 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
@Override
public boolean isAcked(PagedMessage pagedMessage) {
return getPageInfo(pagedMessage.getPageNumber()).isAck(pagedMessage.getMessageNumber());
}
@Override
public void confirmPosition(final PagePosition position) throws Exception {
// if we are dealing with a persistent cursor

View File

@ -65,6 +65,7 @@ import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.Function;
/**
* @see PagingStore
@ -120,6 +121,12 @@ public class PagingStoreImpl implements PagingStore {
private volatile AddressFullMessagePolicy addressFullMessagePolicy;
// Internal components such as mirroring could enforce a different page full message policy
// differing from the AddressSettings
// Example: User configured sync mirroring while default address-settings is PAGE. We must use Block on that case
// User configured non sync mirroring while configured drop. We must use page. (always paged)
private volatile AddressFullMessagePolicy enforcedAddressFullMessagePolicy;
private boolean printedDropMessagesWarning;
private final PagingManager pagingManager;
@ -242,7 +249,11 @@ public class PagingStoreImpl implements PagingStore {
// it can be reconfigured through jdbc-max-page-size-bytes in the JDBC configuration section
pageSize = storageManager.getAllowedPageSize(addressSettings.getPageSizeBytes());
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
if (enforcedAddressFullMessagePolicy != null) {
this.addressFullMessagePolicy = enforcedAddressFullMessagePolicy;
} else {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
}
rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
@ -433,6 +444,13 @@ public class PagingStoreImpl implements PagingStore {
return addressFullMessagePolicy;
}
@Override
public PagingStoreImpl enforceAddressFullMessagePolicy(AddressFullMessagePolicy enforcedAddressFullMessagePolicy) {
this.addressFullMessagePolicy = enforcedAddressFullMessagePolicy;
this.enforcedAddressFullMessagePolicy = enforcedAddressFullMessagePolicy;
return this;
}
@Override
public int getPageSizeBytes() {
return pageSize;
@ -808,14 +826,23 @@ public class PagingStoreImpl implements PagingStore {
@Override
public Page usePage(final long pageId, final boolean create) {
return usePage(pageId, create, create);
}
@Override
public Page usePage(final long pageId, final boolean createEntry, final boolean createFile) {
synchronized (usedPages) {
try {
Page page = usedPages.get(pageId);
if (create && page == null) {
if (createEntry && page == null) {
page = newPageObject(pageId);
if (page.getFile().exists()) {
page.getMessages();
injectPage(page);
} else {
if (!createFile) {
page = null;
}
}
}
if (page != null) {
@ -1143,6 +1170,14 @@ public class PagingStoreImpl implements PagingStore {
public boolean page(Message message,
final Transaction tx,
RouteContextList listCtx) throws Exception {
return page(message, tx, listCtx, null);
}
@Override
public boolean page(Message message,
final Transaction tx,
RouteContextList listCtx,
Function<Message, Message> pageDecorator) throws Exception {
if (!running) {
return false;
@ -1204,12 +1239,13 @@ public class PagingStoreImpl implements PagingStore {
return true;
}
return writePage(message, tx, listCtx);
return writePage(message, tx, listCtx, pageDecorator);
}
private boolean writePage(Message message,
Transaction tx,
RouteContextList listCtx) throws Exception {
RouteContextList listCtx,
Function<Message, Message> pageDecorator) throws Exception {
lock.writeLock().lock();
try {
@ -1217,13 +1253,17 @@ public class PagingStoreImpl implements PagingStore {
return false;
}
final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
// not using page transaction if transaction is declared async
final long transactionID = (tx == null || tx.isAsync()) ? -1 : tx.getID();
if (message.isLargeMessage()) {
((LargeServerMessage) message).setPaged();
if (pageDecorator != null) {
message = pageDecorator.apply(message);
}
message.setPaged();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
int bytesToWrite = pagedMessage.getEncodeSize() + PageReadWriter.SIZE_RECORD;
currentPageSize += bytesToWrite;
@ -1233,7 +1273,7 @@ public class PagingStoreImpl implements PagingStore {
currentPageSize += bytesToWrite;
}
if (tx != null) {
if (tx != null && !tx.isAsync()) {
installPageTransaction(tx, listCtx);
}
@ -1349,7 +1389,10 @@ public class PagingStoreImpl implements PagingStore {
tx.addOperation(pgOper);
}
pgOper.addStore(this);
if (!tx.isAsync()) {
pgOper.addStore(this);
}
pgOper.pageTransaction.increment(listCtx.getNumberOfDurableQueues(), listCtx.getNumberOfNonDurableQueues());
return;
@ -1408,7 +1451,9 @@ public class PagingStoreImpl implements PagingStore {
@Override
public void beforeCommit(final Transaction tx) throws Exception {
syncStore();
if (!tx.isAsync()) {
syncStore();
}
storePageTX(tx);
}

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -33,6 +34,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
@ -300,15 +302,29 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
return loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, null, pendingNonTXPageCounter, journalLoader);
}
default JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
Set<Pair<Long, Long>> pendingLargeMessages,
Set<Long> largeMessagesInFolder,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) throws Exception {
return loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, largeMessagesInFolder, pendingNonTXPageCounter, journalLoader, null);
}
JournalLoadInformation loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
Set<Pair<Long, Long>> pendingLargeMessages,
Set<Long> largeMessagesInFolder,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) throws Exception;
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, QueueBindingInfo> queueInfos,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
Set<Pair<Long, Long>> pendingLargeMessages,
Set<Long> largeMessagesInFolder,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader,
List<Consumer<RecordInfo>> extraRecordsLoader) throws Exception;
long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;

View File

@ -132,6 +132,7 @@ import org.apache.activemq.artemis.utils.critical.CriticalMeasure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.Consumer;
/**
* Controls access to the journals and other storage files such as the ones used to store pages and
@ -978,7 +979,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final Set<Pair<Long, Long>> pendingLargeMessages,
final Set<Long> storedLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
final JournalLoader journalLoader) throws Exception {
final JournalLoader journalLoader,
final List<Consumer<RecordInfo>> journalRecordsListener) throws Exception {
SparseArrayLinkedList<RecordInfo> records = new SparseArrayLinkedList<>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
@ -1324,7 +1326,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
default: {
throw new IllegalStateException("Invalid record type " + recordType);
logger.debug("Extra record type {}", record.userRecordType);
if (journalRecordsListener != null) {
journalRecordsListener.forEach(f -> f.accept(record));
}
}
}
} catch (RuntimeException e) {

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator.IDCounterEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
@ -89,6 +90,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ID_COUNTER_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACK_RETRY;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COMPLETE;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
@ -765,6 +767,9 @@ public final class DescribeJournal {
case ROLE_RECORD:
return AbstractJournalStorageManager.newRoleEncoding(id, buffer);
case ACK_RETRY:
return AckRetry.getPersister().decode(buffer, null, null);
default:
return null;
}

View File

@ -106,4 +106,6 @@ public final class JournalRecordIds {
public static final byte CONNECTOR_RECORD = 51;
public static final byte ADDRESS_SETTING_RECORD_JSON = 52;
public static final byte ACK_RETRY = 53;
}

View File

@ -187,6 +187,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
@Override
public void setPaged() {
super.setPaged();
largeBody.setPaged();
}

View File

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.collections.AbstractHashMapPersister;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.utils.DataConstants;
public final class AckRetry {
String nodeID;
byte[] temporaryNodeBytes;
long messageID;
AckReason reason;
short pageAttempts;
short queueAttempts;
private static Persister persister = new Persister();
public static Persister getPersister() {
return persister;
}
@Override
public String toString() {
return "ACKRetry{" + "nodeID='" + nodeID + '\'' + ", messageID=" + messageID + ", reason=" + reason + '}';
}
public AckRetry() {
}
public AckRetry(String nodeID, long messageID, AckReason reason) {
this.nodeID = nodeID;
this.messageID = messageID;
this.reason = reason;
}
public byte[] getTemporaryNodeBytes() {
if (temporaryNodeBytes == null) {
temporaryNodeBytes = nodeID.getBytes(StandardCharsets.US_ASCII);
}
return temporaryNodeBytes;
}
public void clearTemporaryNodeBytes() {
this.temporaryNodeBytes = null;
}
public String getNodeID() {
return nodeID;
}
public AckRetry setNodeID(String nodeID) {
this.nodeID = nodeID;
return this;
}
public long getMessageID() {
return messageID;
}
public AckRetry setMessageID(long messageID) {
this.messageID = messageID;
return this;
}
public AckReason getReason() {
return reason;
}
public AckRetry setReason(AckReason reason) {
this.reason = reason;
return this;
}
public short getPageAttempts() {
return pageAttempts;
}
public short getQueueAttempts() {
return queueAttempts;
}
public short attemptedPage() {
return ++pageAttempts;
}
public short attemptedQueue() {
return ++queueAttempts;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
AckRetry retry = (AckRetry) o;
if (messageID != retry.messageID)
return false;
return Objects.equals(nodeID, retry.nodeID);
}
@Override
public int hashCode() {
int result = nodeID != null ? nodeID.hashCode() : 0;
result = 31 * result + (int) (messageID ^ (messageID >>> 32));
return result;
}
public static class Persister extends AbstractHashMapPersister<AckRetry, AckRetry> {
private Persister() {
}
@Override
protected int getKeySize(AckRetry key) {
return DataConstants.SIZE_INT +
(key.getNodeID() == null ? 0 : key.getTemporaryNodeBytes().length) +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BYTE;
}
@Override
protected void encodeKey(ActiveMQBuffer buffer, AckRetry key) {
if (key.getNodeID() == null) {
buffer.writeInt(0);
} else {
byte[] temporaryNodeBytes = key.getTemporaryNodeBytes();
buffer.writeInt(temporaryNodeBytes.length);
buffer.writeBytes(temporaryNodeBytes);
}
buffer.writeLong(key.messageID);
buffer.writeByte(key.reason.getVal());
key.clearTemporaryNodeBytes();
}
@Override
protected AckRetry decodeKey(ActiveMQBuffer buffer) {
int sizeBytes = buffer.readInt();
String nodeID;
if (sizeBytes == 0) {
nodeID = null;
} else {
byte[] temporaryNodeBytes = new byte[sizeBytes];
buffer.readBytes(temporaryNodeBytes);
nodeID = new String(temporaryNodeBytes, StandardCharsets.US_ASCII);
}
long messageID = buffer.readLong();
AckReason reason = AckReason.fromValue(buffer.readByte());
return new AckRetry(nodeID, messageID, reason);
}
@Override
protected int getValueSize(AckRetry value) {
return 0;
}
@Override
protected void encodeValue(ActiveMQBuffer buffer, AckRetry value) {
}
@Override
protected AckRetry decodeValue(ActiveMQBuffer buffer, AckRetry key) {
return key;
}
}
}

View File

@ -138,6 +138,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
@Override
public void setPaged() {
super.setPaged();
}
@Override

View File

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -35,6 +36,7 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
@ -379,7 +381,8 @@ public class NullStorageManager implements StorageManager {
final Set<Pair<Long, Long>> pendingLargeMessages,
final Set<Long> storedLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
final JournalLoader journalLoader) throws Exception {
final JournalLoader journalLoader,
final List<Consumer<RecordInfo>> extraLoaders) throws Exception {
return new JournalLoadInformation();
}
@ -736,4 +739,6 @@ public class NullStorageManager implements StorageManager {
public void deleteID(long journalD) throws Exception {
}
}

View File

@ -23,6 +23,7 @@ import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -57,6 +58,8 @@ public interface AddressManager {
SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
LocalQueueBinding findLocalBinding(long id);
void clear();
Binding getBinding(SimpleString queueName);

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@ -127,6 +128,17 @@ public interface PostOffice extends ActiveMQComponent {
*/
Bindings lookupBindingsForAddress(SimpleString address) throws Exception;
LocalQueueBinding findLocalBinding(long bindingID);
default Queue findQueue(final long bindingID) {
LocalQueueBinding binding = findLocalBinding(bindingID);
if (binding != null) {
return binding.getQueue();
} else {
return null;
}
}
/**
* Differently to lookupBindings, this will always create a new element on the Queue if non-existent
*

View File

@ -1080,6 +1080,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return addressManager.getExistingBindingsForRoutingAddress(address);
}
@Override
public LocalQueueBinding findLocalBinding(final long bindingID) {
return addressManager.findLocalBinding(bindingID);
}
@Override
public synchronized Binding getBinding(final SimpleString name) {
return addressManager.getBinding(name);

View File

@ -59,6 +59,8 @@ public class SimpleAddressManager implements AddressManager {
private final StorageManager storageManager;
private final ConcurrentMap<Long, LocalQueueBinding> localBindingsMap = new ConcurrentHashMap<>();
/**
* {@code HashMap<Address, Binding>}
*/
@ -87,6 +89,11 @@ public class SimpleAddressManager implements AddressManager {
this.metricsManager = metricsManager;
}
@Override
public LocalQueueBinding findLocalBinding(long bindingID) {
return localBindingsMap.get(bindingID);
}
@Override
public boolean addBinding(final Binding binding) throws Exception {
final Pair<Binding, Address> bindingAddressPair = new Pair<>(binding, new AddressImpl(binding.getAddress(), wildcardConfiguration));
@ -219,6 +226,10 @@ public class SimpleAddressManager implements AddressManager {
final Binding binding = bindings.removeBindingByUniqueName(bindableQueueName);
if (binding == null) {
throw new IllegalStateException("Cannot find binding " + bindableName);
} else {
if (binding instanceof LocalQueueBinding) {
localBindingsMap.remove(binding.getID());
}
}
if (bindings.getBindings().isEmpty()) {
mappings.remove(realAddress);
@ -272,6 +283,10 @@ public class SimpleAddressManager implements AddressManager {
bindings.addBinding(binding);
if (binding instanceof LocalQueueBinding) {
localBindingsMap.put(binding.getID(), (LocalQueueBinding) binding);
}
return addedNewBindings;
}

View File

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -37,6 +38,7 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationBrokerPlugin;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@ -182,6 +184,10 @@ public interface ActiveMQServer extends ServiceComponent {
*/
void lockActivation();
/** The server has a default listener that will propagate errors to registered listeners.
* This will return the main listener*/
IOCriticalErrorListener getIoCriticalErrorListener();
/**
* Returns the resource to manage this ActiveMQ Artemis server.
*
@ -1011,4 +1017,6 @@ public interface ActiveMQServer extends ServiceComponent {
default String getStatus() {
return "";
}
void registerRecordsLoader(Consumer<RecordInfo> recordsLoader);
}

View File

@ -42,10 +42,6 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
long getMessageID();
/**
* We have to copy the large message content in case of DLQ and paged messages
* For that we need to pre-mark the LargeMessage with a flag when it is paged
*/
void setPaged();
/**

View File

@ -86,6 +86,7 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
@ -404,6 +405,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private String propertiesFileUrl;
private List<Consumer<RecordInfo>> extraRecordsLoader;
private final ActiveMQComponent networkCheckMonitor = new ActiveMQComponent() {
@Override
public void start() throws Exception {
@ -540,6 +543,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
@Override
public void registerRecordsLoader(Consumer<RecordInfo> recordsLoader) {
if (extraRecordsLoader == null) {
extraRecordsLoader = new ArrayList<>();
}
extraRecordsLoader.add(recordsLoader);
}
/**
* A Callback for tests
* @return
@ -1447,6 +1458,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
messagingServerControl = null;
memoryManager = null;
backupManager = null;
extraRecordsLoader = null;
this.storageManager = null;
sessions.clear();
@ -3469,6 +3481,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return;
}
loadProtocolServices();
pagingManager.reloadStores();
Set<Long> storedLargeMessages = new HashSet<>();
@ -3595,10 +3609,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
private void startProtocolServices() throws Exception {
private void loadProtocolServices() {
remotingService.loadProtocolServices(protocolServices);
}
private void startProtocolServices() throws Exception {
for (ProtocolManagerFactory protocolManagerFactory : protocolManagerFactories) {
protocolManagerFactory.loadProtocolServices(this, protocolServices);
}
@ -3858,7 +3873,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
storageManager.recoverLargeMessagesOnFolder(storedLargeMessages);
journalInfo[1] = storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager, queueBindingInfosMap, duplicateIDMap, pendingLargeMessages, storedLargeMessages, pendingNonTXPageCounter, journalLoader);
journalInfo[1] = storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager, queueBindingInfosMap, duplicateIDMap, pendingLargeMessages, storedLargeMessages, pendingNonTXPageCounter, journalLoader, extraRecordsLoader);
journalLoader.handleDuplicateIds(duplicateIDMap);
@ -3878,8 +3893,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
// TODO load users/roles
journalLoader.cleanUp();
return journalInfo;
@ -4814,4 +4827,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return managementLock::unlock;
}
}
@Override
public IOCriticalErrorListener getIoCriticalErrorListener() {
return ioCriticalErrorListener;
}
}

View File

@ -3502,6 +3502,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public synchronized MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore) {
checkIDSupplier(nodeStore);
doInternalPoll(); // we need to flush intermediate references first
MessageReference reference = messageReferences.removeWithID(serverID, id);
if (reference != null) {
refRemoved(reference);

View File

@ -29,6 +29,10 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
* This represents the contract we will use to send messages to replicas.
* */
public interface MirrorController {
default boolean isRetryACK() {
return false;
}
void addAddress(AddressInfo addressInfo) throws Exception;
void deleteAddress(AddressInfo addressInfo) throws Exception;
void createQueue(QueueConfiguration queueConfiguration) throws Exception;

View File

@ -308,6 +308,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void setPaged() {
}
@Override
public boolean isPaged() {
return false;
}
@Override
public String getProtocolName() {
// should normally not be visible in GUI

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
@ -74,6 +75,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.Consumer;
import org.junit.Assert;
import org.junit.Test;
@ -649,7 +652,8 @@ public class TransactionImplTest extends ServerTestBase {
Set<Pair<Long, Long>> pendingLargeMessages,
Set<Long> largeMessagesInFolder,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) throws Exception {
JournalLoader journalLoader,
List<Consumer<RecordInfo>> extraLoaders) throws Exception {
return null;
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
@ -301,4 +302,9 @@ public class FakePostOffice implements PostOffice {
public RoutingStatus route(Message message, boolean direct) throws Exception {
return RoutingStatus.OK;
}
@Override
public LocalQueueBinding findLocalBinding(long bindingID) {
return null;
}
}

View File

@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
@ -538,6 +539,12 @@ public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
waitForServerToStart(slowServer);
waitForServerToStart(server);
Wait.waitFor(() -> server.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror") != null, 5000);
Queue snf = server.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror");
Assert.assertNotNull(snf);
// Mirror is configured to block, we cannot allow anything other than block
Assert.assertEquals(AddressFullMessagePolicy.BLOCK, snf.getPagingStore().getAddressFullMessagePolicy());
server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));

View File

@ -0,0 +1,322 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AckManagerTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ActiveMQServer server1;
private static final String SNF_NAME = "$ACTIVEMQ_ARTEMIS_MIRROR_other";
@Before
@Override
public void setUp() throws Exception {
super.setUp();
server1 = createServer(true, createDefaultConfig(0, true), 100024, -1, -1, -1);
server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1));
server1.getConfiguration().getAcceptorConfigurations().clear();
server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616");
server1.start();
}
@Test
public void testDirectACK() throws Throwable {
String protocol = "AMQP";
SimpleString TOPIC_NAME = SimpleString.toSimpleString("tp" + RandomUtil.randomString());
server1.addAddressInfo(new AddressInfo(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
// creating 5 subscriptions
for (int i = 0; i < 5; i++) {
try (Connection connection = connectionFactory.createConnection()) {
connection.setClientID("c" + i);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME.toString());
session.createDurableSubscriber(topic, "s" + i);
}
}
int numberOfMessages = 500;
int numberOfAcksC1 = 100;
int numberOfAcksC2 = 200;
Queue c1s1 = server1.locateQueue("c1.s1");
Assert.assertNotNull(c1s1);
Queue c2s2 = server1.locateQueue("c2.s2");
Assert.assertNotNull(c2s2);
PagingStore store = server1.getPagingManager().getPageStore(TOPIC_NAME);
store.startPaging();
try (Connection connection = connectionFactory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME.toString());
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < numberOfMessages; i++) {
Message m = session.createTextMessage("hello " + i);
m.setIntProperty("i", i);
producer.send(m);
if ((i + 1) % 100 == 0) {
c1s1.pause();
c2s2.pause();
session.commit();
}
}
session.commit();
}
ReferenceIDSupplier referenceIDSupplier = new ReferenceIDSupplier(server1);
{
AckManager ackManager = AckManagerProvider.getManager(server1, false);
AtomicInteger counter = new AtomicInteger(0);
for (long pageID = store.getFirstPage(); pageID <= store.getCurrentWritingPage(); pageID++) {
Page page = store.usePage(pageID);
try {
page.getMessages().forEach(pagedMessage -> {
int increment = counter.incrementAndGet();
if (increment <= numberOfAcksC1) {
ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), c1s1, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL);
}
if (increment <= numberOfAcksC2) {
ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), c2s2, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL);
}
});
} finally {
page.usageDown();
}
}
}
// in this following loop we will get the ackManager, compare the stored retries. stop the server and validate if they were reloaded correctly
for (int repeat = 0; repeat < 2; repeat++) {
logger.info("Repeating {}", repeat);
AckManager ackManager = AckManagerProvider.getManager(server1, true);
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> sortedRetries = ackManager.sortRetries();
Assert.assertEquals(1, sortedRetries.size());
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksOnAddress = sortedRetries.get(c1s1.getAddress());
Assert.assertEquals(2, acksOnAddress.size());
JournalHashMap<AckRetry, AckRetry, Queue> acksOnc1s1 = acksOnAddress.get(c1s1.getID());
JournalHashMap<AckRetry, AckRetry, Queue> acksOnc2s2 = acksOnAddress.get(c2s2.getID());
Assert.assertEquals(numberOfAcksC1, acksOnc1s1.size());
Assert.assertEquals(numberOfAcksC2, acksOnc2s2.size());
Wait.assertEquals(numberOfMessages, c1s1::getMessageCount);
Wait.assertEquals(numberOfMessages, c2s2::getMessageCount);
AckManager originalManager = AckManagerProvider.getManager(server1, false);
server1.stop();
Assert.assertEquals(0, AckManagerProvider.getSize());
server1.start();
AckManager newManager = AckManagerProvider.getManager(server1, false);
Assert.assertEquals(1, AckManagerProvider.getSize());
Assert.assertNotSame(originalManager, AckManagerProvider.getManager(server1, true));
Assert.assertEquals(1, AckManagerProvider.getSize());
Assert.assertNotSame(newManager, ackManager);
}
AckManager ackManager = AckManagerProvider.getManager(server1, true);
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>> sortedRetries = ackManager.sortRetries();
Assert.assertEquals(1, sortedRetries.size());
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksOnAddress = sortedRetries.get(c1s1.getAddress());
JournalHashMap<AckRetry, AckRetry, Queue> acksOnc1s1 = acksOnAddress.get(c1s1.getID());
JournalHashMap<AckRetry, AckRetry, Queue> acksOnc2s2 = acksOnAddress.get(c2s2.getID());
Wait.assertEquals(0, () -> acksOnc1s1.size());
Wait.assertEquals(0, () -> acksOnc2s2.size());
for (int i = 0; i < 5; i++) {
AtomicInteger counter = new AtomicInteger(0);
for (long pageID = store.getFirstPage(); pageID <= store.getCurrentWritingPage(); pageID++) {
Page page = store.usePage(pageID);
try {
page.getMessages().forEach(pagedMessage -> {
int increment = counter.incrementAndGet();
if (increment <= numberOfAcksC1) {
ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), c1s1, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL);
}
if (increment <= numberOfAcksC2) {
ackManager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), c2s2, referenceIDSupplier.getID(pagedMessage.getMessage()), AckReason.NORMAL);
}
});
} finally {
page.usageDown();
}
}
Wait.assertEquals(0, () -> acksOnc1s1.size());
Wait.assertEquals(0, () -> acksOnc2s2.size());
}
c1s1.resume();
c2s2.resume();
// creating 5 subscriptions
for (int i = 0; i < 5; i++) {
try (Connection connection = connectionFactory.createConnection()) {
connection.setClientID("c" + i);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME.toString());
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "s" + i);
int start;
switch (i) {
case 1:
start = numberOfAcksC1;
break;
case 2:
start = numberOfAcksC2;
break;
default:
start = 0;
}
logger.debug("receiving messages for {}", i);
for (int m = start; m < numberOfMessages; m++) {
logger.debug("Receiving i={}, m={}", i, m);
TextMessage message = (TextMessage) subscriber.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("hello " + m, message.getText());
Assert.assertEquals(m, message.getIntProperty("i"));
}
Assert.assertNull(subscriber.receiveNoWait());
}
}
server1.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
Assert.assertEquals(0, getCounter(JournalRecordIds.ACK_RETRY, countJournal(server1.getConfiguration())));
Assert.assertEquals(1, AckManagerProvider.getSize());
// the server was restarted at least once, locating it again
Queue c1s1AfterRestart = server1.locateQueue("c1.s1");
Assert.assertNotNull(c1s1AfterRestart);
ackManager.addRetry(referenceIDSupplier.getDefaultNodeID(), c1s1, 10_000_000L,AckReason.NORMAL);
ackManager.addRetry(referenceIDSupplier.getDefaultNodeID(), c1s1, 10_000_001L,AckReason.NORMAL);
Wait.assertTrue(() -> ackManager.sortRetries().isEmpty(), 5000);
server1.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
Assert.assertEquals(0, getCounter(JournalRecordIds.ACK_RETRY, countJournal(server1.getConfiguration())));
server1.stop();
Assert.assertEquals(0, AckManagerProvider.getSize());
}
private int getCounter(byte typeRecord, HashMap<Integer, AtomicInteger> values) {
AtomicInteger value = values.get(Integer.valueOf(typeRecord));
if (value == null) {
return 0;
} else {
return value.get();
}
}
protected static AMQPMirrorControllerTarget locateMirrorTarget(ActiveMQServer server) {
ActiveMQServerImpl theServer = (ActiveMQServerImpl) server;
for (RemotingConnection connection : theServer.getRemotingService().getConnections()) {
if (connection instanceof ActiveMQProtonRemotingConnection) {
ActiveMQProtonRemotingConnection protonRC = (ActiveMQProtonRemotingConnection) connection;
for (AMQPSessionContext sessionContext : protonRC.getAmqpConnection().getSessions().values()) {
for (ProtonAbstractReceiver receiver : sessionContext.getReceivers().values()) {
if (receiver instanceof AMQPMirrorControllerTarget) {
return (AMQPMirrorControllerTarget) receiver;
}
}
}
}
}
return null;
}
private int acksCount(File countJournalLocation) throws Exception {
HashMap<Integer, AtomicInteger> countJournal = countJournal(countJournalLocation, 10485760, 2, 2);
AtomicInteger acksCount = countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
return acksCount != null ? acksCount.get() : 0;
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@ -28,24 +27,12 @@ import javax.jms.TextMessage;
import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
@ -97,7 +84,7 @@ public class PagedMirrorTest extends ActiveMQTestBase {
org.apache.activemq.artemis.core.server.Queue snf2 = server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf2);
File countJournalLocation = server1.getConfiguration().getJournalLocation();
File countJournalLocation = server2.getConfiguration().getJournalLocation();
Assert.assertTrue(countJournalLocation.exists() && countJournalLocation.isDirectory());
String protocol = "amqp";
@ -148,6 +135,11 @@ public class PagedMirrorTest extends ActiveMQTestBase {
}
Wait.assertEquals(0, snf1::getMessageCount);
Wait.assertEquals(0, snf2::getMessageCount);
org.apache.activemq.artemis.core.server.Queue queueServer1 = server1.locateQueue("someQueue");
org.apache.activemq.artemis.core.server.Queue queueServer2 = server1.locateQueue("someQueue");
Wait.assertEquals(NUMBER_OF_MESSAGES - 1, queueServer2::getMessageCount, 5000);
Wait.assertEquals(NUMBER_OF_MESSAGES - 1, queueServer1::getMessageCount, 5000);
Wait.assertEquals(1, () -> acksCount(countJournalLocation), 5000, 1000);
HashSet<Integer> receivedIDs = new HashSet<>();
@ -177,128 +169,6 @@ public class PagedMirrorTest extends ActiveMQTestBase {
}
}
@Test
public void testAckWithScan() throws Throwable {
String sendURI = "tcp://localhost:61616";
String consumeURI = "tcp://localhost:61617";
Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
Wait.waitFor(() -> server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
org.apache.activemq.artemis.core.server.Queue snf1 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf1);
org.apache.activemq.artemis.core.server.Queue snf2 = server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf2);
File countJournalLocation = server1.getConfiguration().getJournalLocation();
Assert.assertTrue(countJournalLocation.exists() && countJournalLocation.isDirectory());
String protocol = "amqp";
ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol, sendURI);
String bodyBuffer;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < 1024; i++) {
buffer.append("*");
}
bodyBuffer = buffer.toString();
}
int NUMBER_OF_MESSAGES = 200;
try (Connection sendConnecton = sendCF.createConnection()) {
Session sendSession = sendConnecton.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = sendSession.createQueue("someQueue");
MessageProducer producer = sendSession.createProducer(jmsQueue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = sendSession.createTextMessage(bodyBuffer);
message.setIntProperty("i", i);
producer.send(message);
}
sendSession.commit();
}
Wait.assertEquals(0, snf1::getMessageCount);
Wait.assertEquals(0, snf2::getMessageCount);
org.apache.activemq.artemis.core.server.Queue serverQueue2 = server2.locateQueue("someQueue");
Assert.assertNotNull(serverQueue2);
org.apache.activemq.artemis.core.server.Queue serverQueue1 = server1.locateQueue("someQueue");
Assert.assertNotNull(serverQueue1);
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue2::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue1::getMessageCount);
ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol, consumeURI);
try (Connection connection = consumeCF.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = session.createQueue("someQueue");
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 10; i++) {
Message recMessage = consumer.receive(5000);
Assert.assertNotNull(recMessage);
}
session.commit();
}
Wait.assertEquals(NUMBER_OF_MESSAGES - 10, serverQueue2::getMessageCount);
LinkedList<MessageReference> refs = new LinkedList<>();
serverQueue2.forEach((t) -> refs.add(t));
AMQPMirrorControllerTarget controllerTarget = locateMirrorTarget(server2);
CountDownLatch latch = new CountDownLatch(refs.size());
IOCallback callback = new IOCallback() {
@Override
public void done() {
latch.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
};
for (MessageReference r : refs) {
Long messageID = (Long)r.getMessage().getBrokerProperty(SimpleString.toSimpleString("x-opt-amq-mr-id"));
Object nodeID = r.getMessage().getBrokerProperty(SimpleString.toSimpleString("x-opt-amq-bkr-id"));
// this will force the retry on the queue after a depage happened
controllerTarget.performAckOnPage(nodeID.toString(), messageID, serverQueue2, callback);
}
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
Wait.assertEquals(NUMBER_OF_MESSAGES - 10 - refs.size(), serverQueue2::getMessageCount);
}
protected static AMQPMirrorControllerTarget locateMirrorTarget(ActiveMQServer server) {
ActiveMQServerImpl theServer = (ActiveMQServerImpl) server;
for (RemotingConnection connection : theServer.getRemotingService().getConnections()) {
if (connection instanceof ActiveMQProtonRemotingConnection) {
ActiveMQProtonRemotingConnection protonRC = (ActiveMQProtonRemotingConnection) connection;
for (AMQPSessionContext sessionContext : protonRC.getAmqpConnection().getSessions().values()) {
for (ProtonAbstractReceiver receiver : sessionContext.getReceivers().values()) {
if (receiver instanceof AMQPMirrorControllerTarget) {
return (AMQPMirrorControllerTarget) receiver;
}
}
}
}
}
return null;
}
private int acksCount(File countJournalLocation) throws Exception {
HashMap<Integer, AtomicInteger> countJournal = countJournal(countJournalLocation, 10485760, 2, 2);
AtomicInteger acksCount = countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);

View File

@ -0,0 +1,318 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SNFPagedMirrorTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ActiveMQServer server1;
ActiveMQServer server2;
private static final String SNF_NAME = "$ACTIVEMQ_ARTEMIS_MIRROR_other";
@Before
@Override
public void setUp() throws Exception {
super.setUp();
server1 = createServer(true, createDefaultConfig(0, true), 1024, -1, -1, -1);
server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1));
server1.getConfiguration().getAcceptorConfigurations().clear();
server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616");
AMQPBrokerConnectConfiguration brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server1.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL));
server2 = createServer(true, createDefaultConfig(1, true), 1024, -1, -1, -1);
server2.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1));
server2.getConfiguration().getAcceptorConfigurations().clear();
server2.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61617");
brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(1000);
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server2.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL));
server2.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
server1.start();
server2.start();
}
@Test
public void testPagedEverything() throws Throwable {
testSNFPaged("CORE", true, true, false, 1024);
}
@Test
public void testPageQueueOnly() throws Throwable {
testSNFPaged("CORE", false, true, false, 1024);
}
@Test
public void testPageSNF() throws Throwable {
testSNFPaged("CORE", true, false, false, 1024);
}
@Test
public void testNothingPaged() throws Throwable {
testSNFPaged("CORE", false, true, false, 1024);
}
@Test
public void testPageTargetQueue() throws Throwable {
testSNFPaged("CORE", false, false, true, 1024);
}
@Test
public void testPageTargetQueueAMQPLarge() throws Throwable {
testSNFPaged("AMQP", false, false, true, 250 * 1024);
}
@Test
public void testTargetPaged() throws Throwable {
server1.setIdentity("server1");
server2.setIdentity("server2");
String QUEUE_NAME = "q" + RandomUtil.randomString();
String server1URI = "tcp://localhost:61616";
String server2URI = "tcp://localhost:61617";
Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
Wait.waitFor(() -> server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
org.apache.activemq.artemis.core.server.Queue snf1 = server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf1);
Assert.assertEquals(2, AckManagerProvider.getSize());
server1.addAddressInfo(new AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
org.apache.activemq.artemis.core.server.Queue queueOnServer1 = server1.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
Wait.assertTrue(() -> server2.locateQueue(QUEUE_NAME) != null);
org.apache.activemq.artemis.core.server.Queue queueServer2 = server2.locateQueue(QUEUE_NAME);
snf1.getPagingStore().startPaging();
Wait.assertEquals(0, snf1::getMessageCount);
queueServer2.getPagingStore().stopPaging();
int NUMBER_OF_MESSAGES = 5000;
ConnectionFactory factory1 = CFUtil.createConnectionFactory("CORE", server2URI);
logger.info("Starting producer");
try (Connection connection = factory1.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("hello " + i));
}
session.commit();
}
logger.info("Consumed messages");
Wait.assertEquals(0, snf1::getMessageCount, 5000);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueServer2::getMessageCount);
}
public void testSNFPaged(String protocol, boolean pageSNF, boolean pageQueue, boolean pageTarget, int messageSize) throws Throwable {
server1.setIdentity("server1");
server2.setIdentity("server2");
String QUEUE_NAME = "q" + RandomUtil.randomString();
String sendURI = "tcp://localhost:61616";
String consumerURI = "tcp://localhost:61617";
Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
Wait.waitFor(() -> server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
org.apache.activemq.artemis.core.server.Queue snf1 = server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf1);
logger.info("I currently have {} ACk Managers", AckManagerProvider.getSize());
server1.addAddressInfo(new AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
org.apache.activemq.artemis.core.server.Queue queueOnServer1 = server1.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
Wait.assertTrue(() -> server2.locateQueue(QUEUE_NAME) != null);
Wait.assertEquals(0, snf1::getMessageCount);
JournalStorageManager journalStorageManager = (JournalStorageManager) server1.getStorageManager();
server2.stop();
logger.info("I currently have {} ACk Managers", AckManagerProvider.getSize());
File countJournalLocation = server1.getConfiguration().getJournalLocation();
Assert.assertTrue(countJournalLocation.exists() && countJournalLocation.isDirectory());
ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol, sendURI);
ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol, consumerURI);
String bodyBuffer;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < messageSize; i++) {
buffer.append("*");
}
bodyBuffer = buffer.toString();
}
int NUMBER_OF_MESSAGES = 200;
if (pageQueue) {
queueOnServer1.getPagingStore().startPaging();
}
if (pageSNF) {
snf1.getPagingStore().startPaging();
Wait.assertTrue(() -> snf1.getPagingStore().isPaging(), 5000, 100);
}
try (Connection sendConnecton = server1CF.createConnection()) {
Session sendSession = sendConnecton.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = sendSession.createQueue(QUEUE_NAME);
MessageProducer producer = sendSession.createProducer(jmsQueue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = sendSession.createTextMessage(bodyBuffer);
message.setIntProperty("i", i);
message.setStringProperty("inPage", "this is paged");
producer.send(message);
}
sendSession.commit();
}
if (pageQueue) {
Assert.assertTrue(queueOnServer1.getPagingStore().isPaging());
} else {
Assert.assertFalse(queueOnServer1.getPagingStore().isPaging());
}
if (pageSNF) {
Assert.assertTrue(snf1.getPagingStore().isPaging());
} else {
Assert.assertFalse(snf1.getPagingStore().isPaging());
}
if (pageSNF && pageQueue) {
journalStorageManager.getMessageJournal().scheduleCompactAndBlock(60_000); // to remove previous sent records during the startup
// verifying if everything is actually paged, nothing should be routed on the journal
HashMap<Integer, AtomicInteger> counters = countJournal(server1.getConfiguration());
Assert.assertEquals("There are routed messages on the journal", 0, getCounter(JournalRecordIds.ADD_REF, counters));
Assert.assertEquals("There are routed messages on the journal", 0, getCounter(JournalRecordIds.ADD_MESSAGE, counters));
Assert.assertEquals("There are routed messages on the journal", 0, getCounter(JournalRecordIds.ADD_MESSAGE_PROTOCOL, counters));
}
server2.start();
Assert.assertEquals(2, AckManagerProvider.getSize());
if (pageTarget) {
org.apache.activemq.artemis.core.server.Queue queue2 = server2.locateQueue(QUEUE_NAME);
Assert.assertNotNull(queue2);
queue2.getPagingStore().startPaging();
}
org.apache.activemq.artemis.core.server.Queue snf2 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf2);
Wait.assertEquals(0, snf1::getMessageCount);
Wait.assertEquals(0, snf2::getMessageCount);
org.apache.activemq.artemis.core.server.Queue queueOnServer2 = server2.locateQueue(QUEUE_NAME);
Wait.assertEquals((long) NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount, 5000);
Wait.assertEquals((long) NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount, 5000);
Assert.assertEquals(0, queueOnServer1.getConsumerCount());
Assert.assertEquals(0, queueOnServer1.getDeliveringCount());
try (Connection connection = server2CF.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("i"));
System.out.println("Message " + message.getIntProperty("i"));
}
session.commit();
}
Assert.assertEquals(2, AckManagerProvider.getSize());
Wait.assertEquals(0, snf2::getMessageCount, 15_000);
Wait.assertEquals(0, queueOnServer2::getMessageCount, 15_000);
Wait.assertEquals(0, queueOnServer1::getMessageCount, 15_000);
Wait.assertEquals(0, queueOnServer1::getMessageCount, 15_000);
Wait.assertEquals(0, () -> server1.getConfiguration().getLargeMessagesLocation().listFiles().length);
Wait.assertEquals(0, () -> server2.getConfiguration().getLargeMessagesLocation().listFiles().length);
}
private int getCounter(byte typeRecord, HashMap<Integer, AtomicInteger> values) {
AtomicInteger value = values.get(Integer.valueOf(typeRecord));
if (value == null) {
return 0;
} else {
return value.get();
}
}
}

View File

@ -350,6 +350,16 @@ public class AcknowledgeTest extends ActiveMQTestBase {
return getClass().getName();
}
@Override
public void setPaged() {
}
@Override
public boolean isPaged() {
return false;
}
@Override
public SimpleString getReplyTo() {
return null;

View File

@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -49,6 +50,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
@ -613,8 +615,9 @@ public class SendAckFailTest extends SpawnedTestBase {
Set<Pair<Long, Long>> pendingLargeMessages,
Set<Long> largeMessagesInFolder,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) throws Exception {
return manager.loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, largeMessagesInFolder, pendingNonTXPageCounter, journalLoader);
JournalLoader journalLoader,
List<Consumer<RecordInfo>> extraLoaders) throws Exception {
return manager.loadMessageJournal(postOffice, pagingManager, resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, largeMessagesInFolder, pendingNonTXPageCounter, journalLoader, extraLoaders);
}
@Override

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.journal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.collections.AbstractHashMapPersister;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.Assert;
import org.junit.Test;
public class JournalHashMapTest extends ActiveMQTestBase {
@Test
public void testHashMap() throws Exception {
ExecutorService service = Executors.newFixedThreadPool(10);
runAfter(service::shutdownNow);
OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(service);
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
JournalImpl journal = new JournalImpl(executorFactory, 10 * 1024, 10, 10, 3, 0, 50_000, factory, "coll", "data", 1, 0);
journal.start();
runAfter(journal::stop);
journal.loadInternalOnly();
AtomicLong sequence = new AtomicLong(1);
JournalHashMapProvider<Long, Long, Object> journalHashMapProvider = new JournalHashMapProvider(sequence::incrementAndGet, journal, new LongPersister(), (byte)3, OperationContextImpl::getContext, l -> null, (e, m, f) -> {
e.printStackTrace();
});
JournalHashMap<Long, Long, Object> journalHashMap = journalHashMapProvider.getMap(1);
for (long i = 0; i < 1000; i++) {
journalHashMap.put(i, RandomUtil.randomLong());
}
/// repeating to make sure the remove works fine
for (long i = 0; i < 1000; i++) {
journalHashMap.put(i, RandomUtil.randomLong());
}
journal.flush();
journal.stop();
journalHashMapProvider.clear();
journal.start();
ArrayList<RecordInfo> recordInfos = new ArrayList<>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
journal.load(recordInfos, preparedTransactions, (a, b, c) -> { }, true);
ArrayList<JournalHashMap.MapRecord<Long, Long>> records = new ArrayList<>();
recordInfos.forEach(r -> {
Assert.assertEquals((byte)3, r.userRecordType);
journalHashMapProvider.reload(r);
});
List<JournalHashMap<Long, Long, Object>> existingLists = journalHashMapProvider.getMaps();
Assert.assertEquals(1, existingLists.size());
JournalHashMap<Long, Long, Object> reloadedList = existingLists.get(0);
Assert.assertEquals(journalHashMap.size(), reloadedList.size());
journalHashMap.forEach((a, b) -> Assert.assertEquals(b, reloadedList.get(a)));
}
private static class LongPersister extends AbstractHashMapPersister<Long, Long> {
@Override
public byte getID() {
return 0;
}
@Override
protected int getKeySize(Long key) {
return DataConstants.SIZE_LONG;
}
@Override
protected void encodeKey(ActiveMQBuffer buffer, Long key) {
buffer.writeLong(key);
}
@Override
protected Long decodeKey(ActiveMQBuffer buffer) {
return buffer.readLong();
}
@Override
protected int getValueSize(Long value) {
return DataConstants.SIZE_LONG;
}
@Override
protected void encodeValue(ActiveMQBuffer buffer, Long value) {
buffer.writeLong(value);
}
@Override
protected Long decodeValue(ActiveMQBuffer buffer, Long key) {
return buffer.readLong();
}
}
}

View File

@ -1,172 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PageAckScanTest extends ActiveMQTestBase {
private static final String ADDRESS = "MessagesExpiredPagingTest";
ActiveMQServer server;
protected static final int PAGE_MAX = 10 * 1024;
protected static final int PAGE_SIZE = 1 * 1024;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
Configuration config = createDefaultConfig(0, true).setJournalSyncNonTransactional(false);
config.setMessageExpiryScanPeriod(-1);
server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.getAddressSettingsRepository().clear();
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.start();
server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
}
@Test
public void testScanCore() throws Exception {
testScan("CORE", 5000, 1000, 100, 1024);
}
@Test
public void testScanAMQP() throws Exception {
testScan("AMQP", 5000, 1000, 100, 1024);
}
public void testScan(String protocol, int numberOfMessages, int numberOfMessageSecondWave, int pagingInterval, int bodySize) throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
String extraBody;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < bodySize; i++) {
buffer.append("*");
}
extraBody = buffer.toString();
}
Queue queue = server.locateQueue(ADDRESS);
queue.getPagingStore().startPaging();
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
MessageProducer producer = session.createProducer(jmsQueue);
for (int i = 0; i < 20; i++) {
TextMessage message = session.createTextMessage(extraBody);
message.setIntProperty("i", i);
producer.send(message);
}
session.commit();
}
AtomicInteger errors = new AtomicInteger(0);
ReusableLatch latch = new ReusableLatch(4);
Runnable done = latch::countDown;
Runnable notFound = () -> {
errors.incrementAndGet();
done.run();
};
AtomicInteger retried = new AtomicInteger(0);
PageSubscription subscription = queue.getPageSubscription();
subscription.scanAck(() -> false, new CompareI(15), done, notFound);
subscription.scanAck(() -> false, new CompareI(11), done, notFound);
subscription.scanAck(() -> false, new CompareI(99), done, notFound);
subscription.scanAck(() -> false, new CompareI(-30), done, notFound);
subscription.scanAck(() -> {
retried.incrementAndGet();
return true;}, new CompareI(333), done, notFound);
Assert.assertTrue(latch.await(5, TimeUnit.MINUTES));
Assert.assertEquals(2, errors.get());
Wait.assertEquals(1, retried::get);
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
connection.start();
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 18; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertTrue(message.getIntProperty("i") != 11 && message.getIntProperty("i") != 15);
}
Assert.assertNull(consumer.receiveNoWait());
}
}
class CompareI implements ToIntFunction<PagedReference> {
final int i;
CompareI(int i) {
this.i = i;
}
@Override
public int applyAsInt(PagedReference ref) {
return ref.getMessage().getIntProperty("i").intValue() - i;
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
@ -269,6 +270,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
}
@Override
public Page usePage(long page, boolean createEntry, boolean createFile) {
return null;
}
@Override
public boolean isPageFull() {
return false;
@ -440,6 +446,14 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
return false;
}
@Override
public boolean page(Message message,
Transaction tx,
RouteContextList listCtx,
Function<Message, Message> pageDecorator) throws Exception {
return false;
}
@Override
public boolean checkPageFileExists(long page) throws Exception {
return false;

View File

@ -25,6 +25,8 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -59,13 +61,30 @@ public class PagedMirrorSmokeTest extends SmokeTestBase {
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setRole("amq").setUser("artemis").setPassword("artemis").setNoWeb(true).setConfiguration("./src/main/resources/servers/brokerConnect/pagedA").setArtemisInstance(server0Location);
cliCreateServer.createServer();
configureAcceptor(SERVER_NAME_A);
}
if (!server1Location.exists()) {
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setRole("amq").setUser("artemis").setPassword("artemis").setNoWeb(true).setConfiguration("./src/main/resources/servers/brokerConnect/pagedB").setArtemisInstance(server1Location);
cliCreateServer.createServer();
configureAcceptor(SERVER_NAME_B);
}
}
private static void configureAcceptor(String serverName) throws Exception {
Path configPath = new File(getServerLocation(serverName), "./etc/broker.xml").toPath();
String brokerXML = Files.readString(configPath);
brokerXML = brokerXML.replace(";amqpDuplicateDetection=true", ";amqpDuplicateDetection=true;ackRetryInterval=100");
Assert.assertTrue(brokerXML.contains("ackRetryInterval"));
Files.writeString(configPath, brokerXML);
}

View File

@ -0,0 +1,338 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PagedSNFSoakTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static String body;
static {
StringWriter writer = new StringWriter();
while (writer.getBuffer().length() < 10 * 1024) {
writer.append("This is a string ..... ");
}
body = writer.toString();
}
private static final String QUEUE_NAME = "PagedSNFSoakQueue";
public static final String DC1_NODE_A = "PagedSNFSoakTest/DC1";
public static final String DC2_NODE_A = "PagedSNFSoakTest/DC2";
private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
Process processDC1_node_A;
Process processDC2_node_A;
private static String DC1_NODEA_URI = "tcp://localhost:61616";
private static String DC2_NODEA_URI = "tcp://localhost:61618";
private static void createServer(String serverName,
String connectionName,
String mirrorURI,
int porOffset) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setClustered(false);
cliCreateServer.setNoWeb(false);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
cliCreateServer.addArgs("--queues", QUEUE_NAME);
cliCreateServer.addArgs("--java-memory", "512M");
cliCreateServer.setPortOffset(porOffset);
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("addressSettings.#.maxSizeMessages", "100");
brokerProperties.put("addressSettings.#.addressFullMessagePolicy", "PAGING");
//brokerProperties.put("addressSettings.*MIRROR*.maxSizeMessages", "100");
//brokerProperties.put("addressSettings.*MIRROR*.addressFullMessagePolicy", "BLOCK");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
}
@BeforeClass
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
}
@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC2_NODE_A);
}
@Test(timeout = 240_000L)
public void testAMQP() throws Exception {
testAccumulateAndSend("AMQP");
}
@Test(timeout = 240_000L)
public void testCORE() throws Exception {
testAccumulateAndSend("CORE");
}
@Test
public void testOpenWire() throws Exception {
testAccumulateAndSend("OPENWIRE");
}
private void testAccumulateAndSend(final String protocol) throws Exception {
startDC1();
final int numberOfMessages = 400;
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI);
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI);
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
send(session, queue, numberOfMessages, null);
consume(session, numberOfMessages);
send(session, queue, numberOfMessages, null);
startDC2();
consume(session, numberOfMessages);
for (int i = 0; i < 20; i++) {
final int loopI = i;
System.err.println("Sent " + i);
logger.info("Sent {}", i);
send(session, queue, 10, m -> m.setIntProperty("loop", loopI));
consume(session, 10);
}
send(session, queue, numberOfMessages, null);
}
Wait.assertEquals((long) numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(QUEUE_NAME), 5000, 100);
Wait.assertEquals((long) 0, () -> getCount("DC1", simpleManagementDC1A, SNF_QUEUE), 50_000, 100);
Wait.assertEquals((long) numberOfMessages, () -> getCount("DC2", simpleManagementDC2A, QUEUE_NAME), 30_000, 100);
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
consume(session, numberOfMessages);
}
Wait.assertEquals((long) 0, () -> simpleManagementDC1A.getMessageCountOnQueue(SNF_QUEUE), 5000, 100);
Wait.assertEquals((long) 0, () -> simpleManagementDC2A.getMessageCountOnQueue(SNF_QUEUE), 5000, 100);
Wait.assertEquals((long) 0, () -> simpleManagementDC1A.getMessageCountOnQueue(QUEUE_NAME), 5000, 100);
Wait.assertEquals((long) 0, () -> simpleManagementDC2A.getMessageCountOnQueue(QUEUE_NAME), 5000, 100);
}
@Test
public void testLargeBatches() throws Exception {
String protocol = "AMQP";
startDC1();
startDC2();
final int numberOfMessages = 5_000;
final int batchSize = 1_000;
final int numberOfBatches = 4;
ConnectionFactory[] cfs = new ConnectionFactory[]{CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI), CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI)};
SimpleManagement[] sm = new SimpleManagement[] {new SimpleManagement(DC1_NODEA_URI, null, null), new SimpleManagement(DC2_NODEA_URI, null, null)};
for (int nbatch = 0; nbatch < numberOfBatches; nbatch++) {
// I want to make permutations between which server to consume, and which server to produce
// I am using a bit operation to make that permutation in a easier way
int serverToProduce = ((nbatch & 1) > 0) ? 1 : 0;
int serverToConsume = ((nbatch & 2) > 0) ? 1 : 0;
logger.info("Batch {}, sending on server {}. consuming on server {}", nbatch, serverToProduce, serverToConsume);
try (Connection connection = cfs[serverToProduce].createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
for (int i = 0; i < numberOfMessages; i++) {
producer.send(session.createTextMessage("msg " + i));
if (i > 0 && i % batchSize == 0) {
logger.info("Commit send {}", i);
session.commit();
}
}
session.commit();
}
for (SimpleManagement s : sm) {
logger.info("Checking counts on SNF for {}", s.getUri());
Wait.assertEquals((long) 0, () -> s.getMessageCountOnQueue(SNF_QUEUE), 120_000, 100);
logger.info("Checking counts on {} on {}", QUEUE_NAME, s.getUri());
Wait.assertEquals((long) numberOfMessages, () -> s.getMessageCountOnQueue(QUEUE_NAME), 60_000, 100);
}
try (Connection connection = cfs[serverToConsume].createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
if (i > 0 && i % batchSize == 0) {
logger.info("Commit consume {}", i);
session.commit();
}
}
session.commit();
}
for (SimpleManagement s : sm) {
logger.info("Checking 0 counts on SNF for {}", s.getUri());
Wait.assertEquals((long) 0, () -> s.getMessageCountOnQueue(SNF_QUEUE), 120_000, 100);
logger.info("Checking for empty queue on {}", s.getUri());
Wait.assertEquals((long) 0, () -> s.getMessageCountOnQueue(QUEUE_NAME), 60_000, 100);
}
}
}
private static void consume(Session session, int numberOfMessages) throws JMSException {
Queue queue = session.createQueue(QUEUE_NAME);
try (MessageConsumer consumer = session.createConsumer(queue)) {
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(10_000);
Assert.assertNotNull(message);
Assert.assertEquals(body, message.getText());
Assert.assertEquals(i, message.getIntProperty("id"));
logger.info("received {}", i);
if ((i + 1) % 10 == 0) {
session.commit();
}
}
session.commit();
}
}
private interface Receive<T> {
void accept(T message) throws Exception;
}
private static void send(Session session,
Queue queue,
int numberOfMessages,
Receive<Message> setter) throws JMSException {
try (MessageProducer producer = session.createProducer(queue)) {
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = session.createTextMessage(body);
message.setIntProperty("id", i);
logger.info("send {}", i);
if (setter != null) {
try {
setter.accept(message);
} catch (Throwable ignored) {
}
}
producer.send(message);
if (i % 10 == 0) {
logger.debug("Sent {} messages", i);
session.commit();
}
}
session.commit();
}
}
int getNumberOfLargeMessages(String serverName) throws Exception {
File lmFolder = new File(getServerLocation(serverName) + "/data/large-messages");
Assert.assertTrue(lmFolder.exists());
return lmFolder.list().length;
}
private void startDC1() throws Exception {
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
}
private void stopDC1() throws Exception {
processDC1_node_A.destroyForcibly();
Assert.assertTrue(processDC1_node_A.waitFor(10, TimeUnit.SECONDS));
}
private void stopDC2() throws Exception {
processDC2_node_A.destroyForcibly();
Assert.assertTrue(processDC2_node_A.waitFor(10, TimeUnit.SECONDS));
}
private void startDC2() throws Exception {
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(2, 10_000);
}
public long getCount(String place, SimpleManagement simpleManagement, String queue) throws Exception {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.info("count on {}, queue {} is {}", place, queue, value);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return -1;
}
}
}