ARTEMIS-3767 Incompatibility on replication between 2.17 and current version

This commit is contained in:
Clebert Suconic 2022-07-15 16:00:41 -04:00 committed by clebertsuconic
parent 7bc3b02809
commit 8e54a65227
11 changed files with 197 additions and 51 deletions

View File

@ -71,6 +71,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
return version >= PacketImpl.ARTEMIS_2_24_0_VERSION;
}
default boolean isBeforeTwoEighteen() {
int version = getChannelVersion();
return version < PacketImpl.ARTEMIS_2_18_0_VERSION;
}
/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types

View File

@ -162,11 +162,11 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
break;
}
case REPLICATION_APPEND: {
packet = new ReplicationAddMessage();
packet = new ReplicationAddMessage(connection.isBeforeTwoEighteen());
break;
}
case REPLICATION_APPEND_TX: {
packet = new ReplicationAddTXMessage();
packet = new ReplicationAddTXMessage(connection.isBeforeTwoEighteen());
break;
}
case REPLICATION_DELETE: {
@ -222,7 +222,7 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
break;
}
case PacketImpl.REPLICATION_START_FINISH_SYNC: {
packet = new ReplicationStartSyncMessage();
packet = new ReplicationStartSyncMessage(connection.isBeforeTwoEighteen());
break;
}
case PacketImpl.REPLICATION_SYNC_FILE: {

View File

@ -43,17 +43,22 @@ public final class ReplicationAddMessage extends PacketImpl {
private byte[] recordData;
public ReplicationAddMessage() {
// this is for version compatibility
private final boolean beforeTwoEighteen;
public ReplicationAddMessage(final boolean beforeTwoEighteen) {
super(PacketImpl.REPLICATION_APPEND);
this.beforeTwoEighteen = beforeTwoEighteen;
}
public ReplicationAddMessage(final byte journalID,
public ReplicationAddMessage(final boolean beforeTwoEighteen,
final byte journalID,
final ADD_OPERATION_TYPE operation,
final long id,
final byte journalRecordType,
final Persister persister,
final Object encodingData) {
this();
this(beforeTwoEighteen);
this.journalID = journalID;
this.operation = operation;
this.id = id;
@ -77,7 +82,11 @@ public final class ReplicationAddMessage extends PacketImpl {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeByte(operation.toRecord());
if (beforeTwoEighteen) {
buffer.writeBoolean(operation == ADD_OPERATION_TYPE.UPDATE);
} else {
buffer.writeByte(operation.toRecord());
}
buffer.writeLong(id);
buffer.writeByte(journalRecordType);
buffer.writeInt(persister.getEncodeSize(encodingData));
@ -87,7 +96,16 @@ public final class ReplicationAddMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
journalID = buffer.readByte();
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
if (beforeTwoEighteen) {
boolean isUpdate = buffer.readBoolean();
if (isUpdate) {
operation = ADD_OPERATION_TYPE.UPDATE;
} else {
operation = ADD_OPERATION_TYPE.ADD;
}
} else {
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
}
id = buffer.readLong();
journalRecordType = buffer.readByte();
final int recordDataSize = buffer.readInt();

View File

@ -45,18 +45,23 @@ public class ReplicationAddTXMessage extends PacketImpl {
private ADD_OPERATION_TYPE operation;
public ReplicationAddTXMessage() {
// this is for version compatibility
private final boolean beforeTwoEighteen;
public ReplicationAddTXMessage(final boolean beforeTwoEighteen) {
super(PacketImpl.REPLICATION_APPEND_TX);
this.beforeTwoEighteen = beforeTwoEighteen;
}
public ReplicationAddTXMessage(final byte journalID,
public ReplicationAddTXMessage(final boolean beforeTwoEighteen,
final byte journalID,
final ADD_OPERATION_TYPE operation,
final long txId,
final long id,
final byte recordType,
final Persister persister,
final Object encodingData) {
this();
this(beforeTwoEighteen);
this.journalID = journalID;
this.operation = operation;
this.txId = txId;
@ -82,7 +87,11 @@ public class ReplicationAddTXMessage extends PacketImpl {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeByte(operation.toRecord());
if (beforeTwoEighteen) {
buffer.writeBoolean(operation == ADD_OPERATION_TYPE.UPDATE);
} else {
buffer.writeByte(operation.toRecord());
}
buffer.writeLong(txId);
buffer.writeLong(id);
buffer.writeByte(recordType);
@ -93,7 +102,16 @@ public class ReplicationAddTXMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
journalID = buffer.readByte();
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
if (beforeTwoEighteen) {
boolean isUpdate = buffer.readBoolean();
if (isUpdate) {
operation = ADD_OPERATION_TYPE.UPDATE;
} else {
operation = ADD_OPERATION_TYPE.ADD;
}
} else {
operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
}
txId = buffer.readLong();
id = buffer.readLong();
recordType = buffer.readByte();

View File

@ -16,16 +16,16 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.List;
/**
* This message may signal start or end of the replication synchronization.
* <p>
@ -40,6 +40,10 @@ public class ReplicationStartSyncMessage extends PacketImpl {
private String nodeID;
private boolean allowsAutoFailBack;
// this is for version compatibility
// certain versions will need to interrupt encoding and decoding after synchronizationIsFinished on the encoding depending on its value
private final boolean beforeTwoEighteen;
public enum SyncDataType {
JournalBindings(AbstractJournalStorageManager.JournalContent.BINDINGS.typeByte),
JournalMessages(AbstractJournalStorageManager.JournalContent.MESSAGES.typeByte),
@ -70,12 +74,13 @@ public class ReplicationStartSyncMessage extends PacketImpl {
}
}
public ReplicationStartSyncMessage() {
public ReplicationStartSyncMessage(boolean beforeTwoEighteen) {
super(REPLICATION_START_FINISH_SYNC);
this.beforeTwoEighteen = synchronizationIsFinished;
}
public ReplicationStartSyncMessage(List<Long> filenames) {
this();
public ReplicationStartSyncMessage(boolean beforeTwoEighteen, List<Long> filenames) {
this(beforeTwoEighteen);
ids = new long[filenames.size()];
for (int i = 0; i < filenames.size(); i++) {
ids[i] = filenames.get(i);
@ -85,24 +90,24 @@ public class ReplicationStartSyncMessage extends PacketImpl {
}
public ReplicationStartSyncMessage(String nodeID, long nodeDataVersion) {
this(nodeID);
public ReplicationStartSyncMessage(boolean beforeTwoEighteen, String nodeID, long nodeDataVersion) {
this(beforeTwoEighteen, nodeID);
ids = new long[1];
ids[0] = nodeDataVersion;
dataType = SyncDataType.ActivationSequence;
}
public ReplicationStartSyncMessage(String nodeID) {
this();
public ReplicationStartSyncMessage(boolean beforeTwoEighteen, String nodeID) {
this(beforeTwoEighteen);
synchronizationIsFinished = true;
this.nodeID = nodeID;
}
public ReplicationStartSyncMessage(JournalFile[] datafiles,
public ReplicationStartSyncMessage(boolean beforeTwoEighteen, JournalFile[] datafiles,
AbstractJournalStorageManager.JournalContent contentType,
String nodeID,
boolean allowsAutoFailBack) {
this();
this(beforeTwoEighteen);
this.nodeID = nodeID;
this.allowsAutoFailBack = allowsAutoFailBack;
synchronizationIsFinished = false;
@ -143,6 +148,10 @@ public class ReplicationStartSyncMessage extends PacketImpl {
buffer.writeBoolean(synchronizationIsFinished);
buffer.writeBoolean(allowsAutoFailBack);
buffer.writeString(nodeID);
if (beforeTwoEighteen && synchronizationIsFinished) {
// At this point, pre 2.18.0 servers don't expect any more data to come.
return;
}
buffer.writeByte(dataType.code);
buffer.writeInt(ids.length);
for (long id : ids) {
@ -155,6 +164,10 @@ public class ReplicationStartSyncMessage extends PacketImpl {
synchronizationIsFinished = buffer.readBoolean();
allowsAutoFailBack = buffer.readBoolean();
nodeID = buffer.readString();
if (buffer.readableBytes() == 0) {
// Pre-2.18.0 server wouldn't send anything more than this.
return;
}
dataType = SyncDataType.getDataType(buffer.readByte());
int length = buffer.readInt();
ids = new long[length];

View File

@ -224,7 +224,7 @@ public final class ReplicationManager implements ActiveMQComponent {
final Persister persister,
final Object record) throws Exception {
if (enabled) {
sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, persister, record));
sendReplicatePacket(new ReplicationAddMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, id, recordType, persister, record));
}
}
@ -242,7 +242,7 @@ public final class ReplicationManager implements ActiveMQComponent {
final Persister persister,
final Object record) throws Exception {
if (enabled) {
sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, persister, record));
sendReplicatePacket(new ReplicationAddTXMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, txID, id, recordType, persister, record));
}
}
@ -801,7 +801,7 @@ public final class ReplicationManager implements ActiveMQComponent {
String nodeID,
boolean allowsAutoFailBack) throws ActiveMQException {
if (enabled)
sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType, nodeID, allowsAutoFailBack));
sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), datafiles, contentType, nodeID, allowsAutoFailBack));
}
/**
@ -820,7 +820,7 @@ public final class ReplicationManager implements ActiveMQComponent {
}
synchronizationIsFinishedAcknowledgement.countUp();
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID, server.getNodeManager().getNodeActivationSequence()));
sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), nodeID, server.getNodeManager().getNodeActivationSequence()));
try {
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
ActiveMQReplicationTimeooutException exception = ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
@ -865,7 +865,7 @@ public final class ReplicationManager implements ActiveMQComponent {
idsToSend = new ArrayList<>(largeMessages.keySet());
if (enabled)
sendReplicatePacket(new ReplicationStartSyncMessage(idsToSend));
sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), idsToSend));
}
/**

View File

@ -412,6 +412,58 @@
<variableName>ARTEMIS-2_10_0</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>2_17_0-check</id>
<configuration>
<optional>true</optional>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.17.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.17.0</arg>
<arg>org.apache.activemq:artemis-cli:2.17.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.17.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.17.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.17.0</arg>
<arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
<arg>org.jboss.marshalling:jboss-marshalling-river:2.0.9.Final</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<!-- for future maintainers, notice that if you add new variables you also need to add the system property
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>ARTEMIS-2_17_0</variableName>
</configuration>
</execution> <execution>
<phase>compile</phase>
<goals>
<goal>dependency-scan</goal>
</goals>
<id>2_18_0-check</id>
<configuration>
<optional>true</optional>
<libListWithDeps>
<arg>org.apache.activemq:artemis-jms-server:2.18.0</arg>
<arg>org.apache.activemq:artemis-jms-client:2.18.0</arg>
<arg>org.apache.activemq:artemis-cli:2.18.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.18.0</arg>
<arg>org.apache.activemq:artemis-amqp-protocol:2.18.0</arg>
<arg>org.apache.activemq:artemis-hornetq-protocol:2.18.0</arg>
<arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
<arg>org.jboss.marshalling:jboss-marshalling-river:2.0.9.Final</arg>
</libListWithDeps>
<libList>
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
</libList>
<!-- for future maintainers, notice that if you add new variables you also need to add the system property
otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
<variableName>ARTEMIS-2_18_0</variableName>
</configuration>
</execution>
<execution>
<phase>compile</phase>
<goals>
@ -661,6 +713,14 @@
<name>ARTEMIS-2_10_0</name> <!-- 2.10.0 -->
<value>${ARTEMIS-2_10_0}</value>
</property>
<property>
<name>ARTEMIS-2_17_0</name>
<value>${ARTEMIS-2_17_0}</value>
</property>
<property>
<name>ARTEMIS-2_18_0</name>
<value>${ARTEMIS-2_18_0}</value>
</property>
<property>
<name>ARTEMIS-2_22_0</name>
<value>${ARTEMIS-2_22_0}</value>

View File

@ -39,6 +39,8 @@ public class GroovyRun {
public static final String TWO_SIX_THREE = "ARTEMIS-263";
public static final String TWO_SEVEN_ZERO = "ARTEMIS-270";
public static final String TWO_TEN_ZERO = "ARTEMIS-2_10_0";
public static final String TWO_SEVENTEEN_ZERO = "ARTEMIS-2_17_0";
public static final String TWO_EIGHTEEN_ZERO = "ARTEMIS-2_18_0";
public static final String TWO_TWENTYTWO_ZERO = "ARTEMIS-2_22_0";
public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247";

View File

@ -39,7 +39,12 @@ configuration.addAcceptorConfiguration("artemis", "tcp://localhost:" + port);
configuration.addConnectorConfiguration("local", "tcp://localhost:" + port);
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(true);
configuration.setGlobalMaxMessages(100);
if (configuration.metaClass.hasMetaProperty("globalMaxMessages")) {
configuration.globalMaxMessages = 10
} else {
configuration.globalMaxSize = 10 * 1024
}
configuration.setHAPolicyConfiguration(new ReplicaPolicyConfiguration().setClusterName("main"))
configuration.addAddressesSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));

View File

@ -41,7 +41,13 @@ configuration.addAcceptorConfiguration("artemis", "tcp://localhost:" + port);
configuration.addConnectorConfiguration("local", "tcp://localhost:" + port);
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(true);
configuration.setGlobalMaxMessages(10);
if (configuration.metaClass.hasMetaProperty("globalMaxMessages")) {
configuration.globalMaxMessages = 10
} else {
configuration.globalMaxSize = 10 * 1024
}
configuration.setHAPolicyConfiguration(new ReplicatedPolicyConfiguration().setClusterName("main"))
configuration.addAddressesSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));

View File

@ -38,6 +38,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_EIGHTEEN_ZERO;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_SEVENTEEN_ZERO;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYTWO_ZERO;
@RunWith(Parameterized.class)
@ -58,7 +60,11 @@ public class MultiVersionReplicaTest extends ClasspathBase {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{TWO_TWENTYTWO_ZERO, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, TWO_TWENTYTWO_ZERO});
// The SNAPSHOT/SNAPSHOT is here as a teest validation only, like in other cases where SNAPSHOT/SNAPSHOT is used.
combinations.add(new Object[]{TWO_SEVENTEEN_ZERO, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, TWO_SEVENTEEN_ZERO});
combinations.add(new Object[]{TWO_EIGHTEEN_ZERO, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, TWO_EIGHTEEN_ZERO});
// The SNAPSHOT/SNAPSHOT is here as a test validation only, like in other cases where SNAPSHOT/SNAPSHOT is used.
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT});
return combinations;
}
@ -93,35 +99,48 @@ public class MultiVersionReplicaTest extends ClasspathBase {
evaluate(mainClassloader, "multiVersionReplica/mainServerIsReplicated.groovy");
send(new ActiveMQConnectionFactory("tcp://localhost:61000"), 2000);
send(new JmsConnectionFactory("amqp://localhost:61000"), 2000);
send(new ActiveMQConnectionFactory("tcp://localhost:61000"), 2000, 10);
send(new JmsConnectionFactory("amqp://localhost:61000"), 2000, 10);
evaluate(mainClassloader, "multiVersionReplica/mainServerStop.groovy");
evaluate(backupClassLoader, "multiVersionReplica/backupServerIsActive.groovy");
receive(new ActiveMQConnectionFactory("tcp://localhost:61001"), 2000);
receive(new JmsConnectionFactory("amqp://localhost:61001"), 2000);
receive(new ActiveMQConnectionFactory("tcp://localhost:61001"), 2010);
receive(new JmsConnectionFactory("amqp://localhost:61001"), 2010);
evaluate(backupClassLoader, "multiVersionReplica/backupServerStop.groovy");
}
private void send(ConnectionFactory factory, int numberOfMessages) throws Throwable {
private void send(ConnectionFactory factory, int numberOfMessagesTx, int numberOfMessagesNonTx) throws Throwable {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
boolean pending = false;
for (int i = 0; i < numberOfMessages; i++) {
producer.send(session.createTextMessage("Hello world!!!!!"));
pending = true;
if (i > 0 && i % 100 == 0) {
session.commit();
pending = false;
Queue queue;
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
boolean pending = false;
for (int i = 0; i < numberOfMessagesTx; i++) {
producer.send(session.createTextMessage("Hello world!!!!!"));
pending = true;
if (i > 0 && i % 100 == 0) {
session.commit();
pending = false;
}
}
if (pending) {
session.commit();
}
session.close();
}
if (pending) {
session.commit();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < numberOfMessagesNonTx; i++) {
producer.send(session.createTextMessage("Hello world!!!!!"));
}
}
}
}