From c4bfb9521fd322c7179d31d5b5f7acf3f25d32dd Mon Sep 17 00:00:00 2001 From: shoukun Date: Wed, 27 Dec 2017 10:23:33 +0800 Subject: [PATCH 1/2] ARTEMIS-1570 Flush appendExecutor before take journal snapshot When live start replication, it must make sure there is no pending write in message & bindings journal, or we may lost journal records during initial replication. So we need flush append executor after acquire StorageManager's write lock, before Journal's write lock. Also we set a 10 seconds timeout when flush, the same as Journal::flushExecutor. If we failed to flush in 10 seconds, we abort replication, backup will try again later. Use OrderedExecutorFactory::flushExecutor to flush executor --- .../activemq/artemis/core/journal/impl/JournalImpl.java | 4 ++++ .../persistence/impl/journal/JournalStorageManager.java | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 5f31a2b216..77bf9da5e2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2237,6 +2237,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } + public boolean flushAppendExecutor(long timeout, TimeUnit unit) throws InterruptedException { + return OrderedExecutorFactory.flushExecutor(appendExecutor, timeout, unit); + } + @Override public int getDataFilesCount() { return filesRepository.getDataFilesCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 87f4fc9982..c54a3b77b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -568,6 +568,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw new ActiveMQIllegalStateException("already replicating"); replicator = replicationManager; + if (!((JournalImpl) originalMessageJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live message journal is busy"); + } + + if (!((JournalImpl) originalBindingsJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live bindings journal is busy"); + } + // Establishes lock originalMessageJournal.synchronizationLock(); originalBindingsJournal.synchronizationLock(); From 7c6530eb79a8fa642d1f0d7dd23e95932a48cee3 Mon Sep 17 00:00:00 2001 From: shoukun Date: Thu, 28 Dec 2017 14:57:18 +0800 Subject: [PATCH 2/2] ARTEMIS-1570 Test replication consistency Test consistency between live and backup, espacially on a slow live. The test use MessagePersister::encode to simulate slow IO condition. After live started, we send 5 message with a delay(default 500ms), then start backup, wait until replicated, then send more message without delay. If all message sent successfully, the backup should has the same messages as live. We assert the message number only. --- .../SharedNothingReplicationTest.java | 372 ++++++++++++++++++ 1 file changed, 372 insertions(+) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java new file mode 100644 index 0000000000..3997c1dad3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.replication; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.client.TopologyMember; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; +import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.junit.Wait; +import org.jboss.logging.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class SharedNothingReplicationTest { + + private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class); + + @Rule + public TemporaryFolder brokersFolder = new TemporaryFolder(); + + private SlowMessagePersister slowMessagePersister; + + @Before + public void setUp() throws Exception { + slowMessagePersister = new SlowMessagePersister(); + CoreMessagePersister.theInstance = slowMessagePersister; + } + + @After + public void tearDown() throws Exception { + if (slowMessagePersister != null) { + CoreMessagePersister.theInstance = slowMessagePersister.persister; + } + } + + @Test + public void testReplicateFromSlowLive() throws Exception { + // start live + Configuration liveConfiguration = createLiveConfiguration(); + ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration); + liveServer.start(); + + Wait.waitFor(() -> liveServer.isStarted()); + + CoreMessagePersister.theInstance = SlowMessagePersister._getInstance(); + + final CountDownLatch replicated = new CountDownLatch(1); + + ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616"); + locator.setCallTimeout(60_000L); + locator.setConnectionTTL(60_000L); + locator.addClusterTopologyListener(new ClusterTopologyListener() { + @Override + public void nodeUP(TopologyMember member, boolean last) { + logger.infof("nodeUP fired last=%s, live=%s, backup=%s", last, member.getLive(), member.getBackup()); + if (member.getBackup() != null) { + replicated.countDown(); + } + } + + @Override + public void nodeDown(long eventUID, String nodeID) { + + } + }); + + final ClientSessionFactory csf = locator.createSessionFactory(); + ClientSession sess = csf.createSession(); + sess.createQueue("slow", RoutingType.ANYCAST, "slow", true); + sess.close(); + Executor sendMessageExecutor = Executors.newCachedThreadPool(); + + // let's write some messages + int i = 0; + final int j = 50; + final CountDownLatch allMessageSent = new CountDownLatch(j); + while (i < 5) { + sendMessageExecutor.execute(() -> { + try { + ClientSession session = csf.createSession(true, true); + ClientProducer producer = session.createProducer("slow"); + ClientMessage message = session.createMessage(true); + // this will make journal's append executor busy + message.putLongProperty("delay", 500L); + logger.infof("try to send a message before replicated"); + producer.send(message); + logger.info("send message done"); + producer.close(); + session.close(); + + allMessageSent.countDown(); + } catch (ActiveMQException e) { + logger.error("send message", e); + } + }); + i++; + } + + // start backup + Configuration backupConfiguration = createBackupConfiguration(); + ActiveMQServer backupServer = ActiveMQServers.newActiveMQServer(backupConfiguration); + backupServer.start(); + + Wait.waitFor(() -> backupServer.isStarted()); + + Assert.assertTrue("can not replicate in 30 seconds", replicated.await(30, TimeUnit.SECONDS)); + + while (i < j) { + sendMessageExecutor.execute(() -> { + try { + ClientSession session = csf.createSession(true, true); + ClientProducer producer = session.createProducer("slow"); + ClientMessage message = session.createMessage(true); + message.putLongProperty("delay", 0L); + logger.infof("try to send a message after replicated"); + producer.send(message); + logger.info("send message done"); + producer.close(); + session.close(); + + allMessageSent.countDown(); + } catch (ActiveMQException e) { + logger.error("send message", e); + } + }); + i++; + } + + Assert.assertTrue("all message sent", allMessageSent.await(30, TimeUnit.SECONDS)); + + csf.close(); + locator.close(); + backupServer.stop(true); + liveServer.stop(true); + + SequentialFileFactory fileFactory; + + File liveJournalDir = brokersFolder.getRoot().toPath().resolve("live").resolve("data").resolve("journal").toFile(); + fileFactory = new MappedSequentialFileFactory(liveConfiguration.getJournalLocation(), liveConfiguration.getJournalFileSize(), false, liveConfiguration.getJournalBufferSize_NIO(), liveConfiguration.getJournalBufferTimeout_NIO(), null); + + JournalImpl liveMessageJournal = new JournalImpl(liveConfiguration.getJournalFileSize(), liveConfiguration.getJournalMinFiles(), liveConfiguration.getJournalPoolFiles(), liveConfiguration.getJournalCompactMinFiles(), liveConfiguration.getJournalCompactPercentage(), fileFactory, "activemq-data", "amq", fileFactory.getMaxIO()); + + liveMessageJournal.start(); + final AtomicInteger liveJournalCounter = new AtomicInteger(); + liveMessageJournal.load(new AddRecordLoaderCallback() { + @Override + public void addRecord(RecordInfo info) { + if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) { + // ignore + } + logger.infof("got live message %d", info.id); + liveJournalCounter.incrementAndGet(); + } + }); + + // read backup's journal + File backupJournalDir = brokersFolder.getRoot().toPath().resolve("backup").resolve("data").resolve("journal").toFile(); + fileFactory = new MappedSequentialFileFactory(backupConfiguration.getJournalLocation(), backupConfiguration.getJournalFileSize(), false, backupConfiguration.getJournalBufferSize_NIO(), backupConfiguration.getJournalBufferTimeout_NIO(), null); + + JournalImpl backupMessageJournal = new JournalImpl(backupConfiguration.getJournalFileSize(), backupConfiguration.getJournalMinFiles(), backupConfiguration.getJournalPoolFiles(), backupConfiguration.getJournalCompactMinFiles(), backupConfiguration.getJournalCompactPercentage(), fileFactory, "activemq-data", "amq", fileFactory.getMaxIO()); + + backupMessageJournal.start(); + + final AtomicInteger replicationCounter = new AtomicInteger(); + backupMessageJournal.load(new AddRecordLoaderCallback() { + @Override + public void addRecord(RecordInfo info) { + if (!(info.userRecordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL)) { + // ignore + } + logger.infof("replicated message %d", info.id); + replicationCounter.incrementAndGet(); + } + }); + + logger.infof("expected %d messages, live=%d, backup=%d", j, liveJournalCounter.get(), replicationCounter.get()); + Assert.assertEquals("Live lost journal record", j, liveJournalCounter.get()); + Assert.assertEquals("Backup did not replicated all journal", j, replicationCounter.get()); + + // if this ever happens.. you need to make sure this persister is registered instead of the CoreMessagePersister + Assert.assertTrue("The test is not valid, slow persister stopped being used", SlowMessagePersister._getInstance().used); + } + + private Configuration createLiveConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setName("localhost::live"); + + File liveDir = brokersFolder.newFolder("live"); + conf.setBrokerInstance(liveDir); + + conf.addAcceptorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + ReplicatedPolicyConfiguration haPolicy = new ReplicatedPolicyConfiguration(); + haPolicy.setVoteOnReplicationFailure(false); + haPolicy.setCheckForLiveServer(false); + conf.setHAPolicyConfiguration(haPolicy); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup"); + ccconf.setName("cluster"); + ccconf.setConnectorName("live"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } + + private Configuration createBackupConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setName("localhost::backup"); + + File backupDir = brokersFolder.newFolder("backup"); + conf.setBrokerInstance(backupDir); + + ReplicaPolicyConfiguration haPolicy = new ReplicaPolicyConfiguration(); + haPolicy.setClusterName("cluster"); + conf.setHAPolicyConfiguration(haPolicy); + + conf.addAcceptorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("live"); + ccconf.setName("cluster"); + ccconf.setConnectorName("backup"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } + + static class SlowMessagePersister extends CoreMessagePersister implements Persister { + + boolean used = false; + + private static final Logger logger = Logger.getLogger(SlowMessagePersister.class); + + static SlowMessagePersister theInstance; + + private final CoreMessagePersister persister; + + private SlowMessagePersister() { + persister = CoreMessagePersister.getInstance(); + } + + static SlowMessagePersister _getInstance() { + if (theInstance == null) { + theInstance = new SlowMessagePersister(); + } + return theInstance; + } + + @Override + public byte getID() { + return persister.getID(); + } + + @Override + public int getEncodeSize(Message record) { + return persister.getEncodeSize(record); + } + + @Override + public void encode(ActiveMQBuffer buffer, Message record) { + used = true; + try { + Long delay = record.getLongProperty("delay"); + if (delay == null || delay.longValue() <= 0) { + logger.infof("encode message %d, caller=%s", record.getMessageID(), Thread.currentThread().getName()); + } else { + logger.infof("sleep %d ms before encode message %d, caller=%s", delay.longValue(), record.getMessageID(), Thread.currentThread().getName()); + Thread.sleep(delay.longValue()); + } + } catch (InterruptedException e) { + // it's ok + } + persister.encode(buffer, record); + } + + @Override + public Message decode(ActiveMQBuffer buffer, Message record) { + return persister.decode(buffer, record); + } + } + + abstract class AddRecordLoaderCallback implements LoaderCallback { + + @Override + public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction) { + + } + + @Override + public void deleteRecord(long id) { + + } + + @Override + public void updateRecord(RecordInfo info) { + + } + + @Override + public void failedTransaction(long transactionID, List records, List recordsToDelete) { + + } + } + +} \ No newline at end of file