diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 5f31a2b216..77bf9da5e2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2237,6 +2237,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } + public boolean flushAppendExecutor(long timeout, TimeUnit unit) throws InterruptedException { + return OrderedExecutorFactory.flushExecutor(appendExecutor, timeout, unit); + } + @Override public int getDataFilesCount() { return filesRepository.getDataFilesCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 87f4fc9982..c54a3b77b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -568,6 +568,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw new ActiveMQIllegalStateException("already replicating"); replicator = replicationManager; + if (!((JournalImpl) originalMessageJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live message journal is busy"); + } + + if (!((JournalImpl) originalBindingsJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live bindings journal is busy"); + } + // Establishes lock originalMessageJournal.synchronizationLock(); originalBindingsJournal.synchronizationLock(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java new file mode 100644 index 0000000000..3997c1dad3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.replication; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.client.TopologyMember; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; +import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.junit.Wait; +import org.jboss.logging.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class SharedNothingReplicationTest { + + private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class); + + @Rule + public TemporaryFolder brokersFolder = new TemporaryFolder(); + + private SlowMessagePersister slowMessagePersister; + + @Before + public void setUp() throws Exception { + slowMessagePersister = new SlowMessagePersister(); + CoreMessagePersister.theInstance = slowMessagePersister; + } + + @After + public void tearDown() throws Exception { + if (slowMessagePersister != null) { + CoreMessagePersister.theInstance = slowMessagePersister.persister; + } + } + + @Test + public void testReplicateFromSlowLive() throws Exception { + // start live + Configuration liveConfiguration = createLiveConfiguration(); + ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration); + liveServer.start(); + + Wait.waitFor(() -> liveServer.isStarted()); + + CoreMessagePersister.theInstance = SlowMessagePersister._getInstance(); + + final CountDownLatch replicated = new CountDownLatch(1); + + ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616"); + locator.setCallTimeout(60_000L); + locator.setConnectionTTL(60_000L); + locator.addClusterTopologyListener(new ClusterTopologyListener() { + @Override + public void nodeUP(TopologyMember member, boolean last) { + logger.infof("nodeUP fired last=%s, live=%s, backup=%s", last, member.getLive(), member.getBackup()); + if (member.getBackup() != null) { + replicated.countDown(); + } + } + + @Override + public void nodeDown(long eventUID, String nodeID) { + + } + }); + + final ClientSessionFactory csf = locator.createSessionFactory(); + ClientSession sess = csf.createSession(); + sess.createQueue("slow", RoutingType.ANYCAST, "slow", true); + sess.close(); + Executor sendMessageExecutor = Executors.newCachedThreadPool(); + + // let's write some messages + int i = 0; + final int j = 50; + final CountDownLatch allMessageSent = new CountDownLatch(j); + while (i < 5) { + sendMessageExecutor.execute(() -> { + try { + ClientSession session = csf.createSession(true, true); + ClientProducer producer = session.createProducer("slow"); + ClientMessage message = session.createMessage(true); + // this will make journal's append executor busy + message.putLongProperty("delay", 500L); + logger.infof("try to send a message before replicated"); + producer.send(message); + logger.info("send message done"); + producer.close(); + session.close(); + + allMessageSent.countDown(); + } catch (ActiveMQException e) { + logger.error("send message", e); + } + }); + i++; + } + + // start backup + Configuration backupConfiguration = createBackupConfiguration(); + ActiveMQServer backupServer = ActiveMQServers.newActiveMQServer(backupConfiguration); + backupServer.start(); + + Wait.waitFor(() -> backupServer.isStarted()); + + Assert.assertTrue("can not replicate in 30 seconds", replicated.await(30, TimeUnit.SECONDS)); + + while (i < j) { + sendMessageExecutor.execute(() -> { + try { + ClientSession session = csf.createSession(true, true); + ClientProducer producer = session.createProducer("slow"); + ClientMessage message = session.createMessage(true); + message.putLongProperty("delay", 0L); + logger.infof("try to send a message after replicated"); + producer.send(message); + logger.info("send message done"); + producer.close(); + session.close(); + + allMessageSent.countDown(); + } catch (ActiveMQException e) { + logger.error("send message", e); + } + }); + i++; + } + + Assert.assertTrue("all message sent", allMessageSent.await(30, TimeUnit.SECONDS)); + + csf.close(); + locator.close(); + backupServer.stop(true); + liveServer.stop(true); + + SequentialFileFactory fileFactory; + + File liveJournalDir = brokersFolder.getRoot().toPath().resolve("live").resolve("data").resolve("journal").toFile(); + fileFactory = new MappedSequentialFileFactory(liveConfiguration.getJournalLocation(), liveConfiguration.getJournalFileSize(), false, liveConfiguration.getJournalBufferSize_NIO(), liveConfiguration.getJournalBufferTimeout_NIO(), null); + + JournalImpl liveMessageJournal = new JournalImpl(liveConfiguration.getJournalFileSize(), liveConfiguration.getJournalMinFiles(), liveConfiguration.getJournalPoolFiles(), liveConfiguration.getJournalCompactMinFiles(), liveConfiguration.getJournalCompactPercentage(), fileFactory, "activemq-data", "amq", fileFactory.getMaxIO()); + + liveMessageJournal.start(); + final AtomicInteger liveJournalCounter = new AtomicInteger(); + liveMessageJournal.load(new AddRecordLoaderCallback() { + @Override + public void addRecord(RecordInfo info) { + if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) { + // ignore + } + logger.infof("got live message %d", info.id); + liveJournalCounter.incrementAndGet(); + } + }); + + // read backup's journal + File backupJournalDir = brokersFolder.getRoot().toPath().resolve("backup").resolve("data").resolve("journal").toFile(); + fileFactory = new MappedSequentialFileFactory(backupConfiguration.getJournalLocation(), backupConfiguration.getJournalFileSize(), false, backupConfiguration.getJournalBufferSize_NIO(), backupConfiguration.getJournalBufferTimeout_NIO(), null); + + JournalImpl backupMessageJournal = new JournalImpl(backupConfiguration.getJournalFileSize(), backupConfiguration.getJournalMinFiles(), backupConfiguration.getJournalPoolFiles(), backupConfiguration.getJournalCompactMinFiles(), backupConfiguration.getJournalCompactPercentage(), fileFactory, "activemq-data", "amq", fileFactory.getMaxIO()); + + backupMessageJournal.start(); + + final AtomicInteger replicationCounter = new AtomicInteger(); + backupMessageJournal.load(new AddRecordLoaderCallback() { + @Override + public void addRecord(RecordInfo info) { + if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) { + // ignore + } + logger.infof("replicated message %d", info.id); + replicationCounter.incrementAndGet(); + } + }); + + logger.infof("expected %d messages, live=%d, backup=%d", j, liveJournalCounter.get(), replicationCounter.get()); + Assert.assertEquals("Live lost journal record", j, liveJournalCounter.get()); + Assert.assertEquals("Backup did not replicated all journal", j, replicationCounter.get()); + + // if this ever happens.. you need to make sure this persister is registered instead of the CoreMessagePersister + Assert.assertTrue("The test is not valid, slow persister stopped being used", SlowMessagePersister._getInstance().used); + } + + private Configuration createLiveConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setName("localhost::live"); + + File liveDir = brokersFolder.newFolder("live"); + conf.setBrokerInstance(liveDir); + + conf.addAcceptorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + ReplicatedPolicyConfiguration haPolicy = new ReplicatedPolicyConfiguration(); + haPolicy.setVoteOnReplicationFailure(false); + haPolicy.setCheckForLiveServer(false); + conf.setHAPolicyConfiguration(haPolicy); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup"); + ccconf.setName("cluster"); + ccconf.setConnectorName("live"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } + + private Configuration createBackupConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setName("localhost::backup"); + + File backupDir = brokersFolder.newFolder("backup"); + conf.setBrokerInstance(backupDir); + + ReplicaPolicyConfiguration haPolicy = new ReplicaPolicyConfiguration(); + haPolicy.setClusterName("cluster"); + conf.setHAPolicyConfiguration(haPolicy); + + conf.addAcceptorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("live"); + ccconf.setName("cluster"); + ccconf.setConnectorName("backup"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } + + static class SlowMessagePersister extends CoreMessagePersister implements Persister { + + boolean used = false; + + private static final Logger logger = Logger.getLogger(SlowMessagePersister.class); + + static SlowMessagePersister theInstance; + + private final CoreMessagePersister persister; + + private SlowMessagePersister() { + persister = CoreMessagePersister.getInstance(); + } + + static SlowMessagePersister _getInstance() { + if (theInstance == null) { + theInstance = new SlowMessagePersister(); + } + return theInstance; + } + + @Override + public byte getID() { + return persister.getID(); + } + + @Override + public int getEncodeSize(Message record) { + return persister.getEncodeSize(record); + } + + @Override + public void encode(ActiveMQBuffer buffer, Message record) { + used = true; + try { + Long delay = record.getLongProperty("delay"); + if (delay == null || delay.longValue() <= 0) { + logger.infof("encode message %d, caller=%s", record.getMessageID(), Thread.currentThread().getName()); + } else { + logger.infof("sleep %d ms before encode message %d, caller=%s", delay.longValue(), record.getMessageID(), Thread.currentThread().getName()); + Thread.sleep(delay.longValue()); + } + } catch (InterruptedException e) { + // it's ok + } + persister.encode(buffer, record); + } + + @Override + public Message decode(ActiveMQBuffer buffer, Message record) { + return persister.decode(buffer, record); + } + } + + abstract class AddRecordLoaderCallback implements LoaderCallback { + + @Override + public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction) { + + } + + @Override + public void deleteRecord(long id) { + + } + + @Override + public void updateRecord(RecordInfo info) { + + } + + @Override + public void failedTransaction(long transactionID, List records, List recordsToDelete) { + + } + } + +} \ No newline at end of file