ARTEMIS-3261 Expanding verification to journal compacting counters
After testing a production outage situation I still encountered issues on deciding the journal should be compacted. This is addressing these issues.
This commit is contained in:
parent
62395dcd44
commit
42405fedcf
|
@ -32,6 +32,10 @@ public interface JournalFile {
|
|||
|
||||
void decPosCount();
|
||||
|
||||
void incAddRecord();
|
||||
|
||||
int getAddRecord();
|
||||
|
||||
void addSize(int bytes);
|
||||
|
||||
void decSize(int bytes);
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.Map.Entry;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.jboss.logging.Logger;
|
||||
|
@ -34,9 +35,13 @@ public class JournalFileImpl implements JournalFile {
|
|||
|
||||
private long offset;
|
||||
|
||||
private final AtomicInteger posCount = new AtomicInteger(0);
|
||||
private static final AtomicIntegerFieldUpdater<JournalFileImpl> posCountUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "posCountField");
|
||||
private static final AtomicIntegerFieldUpdater<JournalFileImpl> addRecordUpdate = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "addRecordField");
|
||||
private static final AtomicIntegerFieldUpdater<JournalFileImpl> liveBytesUpdater = AtomicIntegerFieldUpdater.newUpdater(JournalFileImpl.class, "liveBytesField");
|
||||
|
||||
private final AtomicInteger liveBytes = new AtomicInteger(0);
|
||||
private volatile int posCountField = 0;
|
||||
private volatile int addRecordField = 0;
|
||||
private volatile int liveBytesField = 0;
|
||||
|
||||
// Flags to be used by determine if the journal file can be reclaimed
|
||||
private boolean posReclaimCriteria = false;
|
||||
|
@ -62,7 +67,7 @@ public class JournalFileImpl implements JournalFile {
|
|||
|
||||
@Override
|
||||
public int getPosCount() {
|
||||
return posCount.intValue();
|
||||
return posCountUpdater.get(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -135,12 +140,22 @@ public class JournalFileImpl implements JournalFile {
|
|||
|
||||
@Override
|
||||
public void incPosCount() {
|
||||
posCount.incrementAndGet();
|
||||
posCountUpdater.incrementAndGet(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incAddRecord() {
|
||||
addRecordUpdate.incrementAndGet(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getAddRecord() {
|
||||
return addRecordUpdate.get(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decPosCount() {
|
||||
posCount.decrementAndGet();
|
||||
posCountUpdater.decrementAndGet(this);
|
||||
}
|
||||
|
||||
public long getOffset() {
|
||||
|
@ -191,17 +206,17 @@ public class JournalFileImpl implements JournalFile {
|
|||
|
||||
@Override
|
||||
public void addSize(final int bytes) {
|
||||
liveBytes.addAndGet(bytes);
|
||||
liveBytesUpdater.addAndGet(this, bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decSize(final int bytes) {
|
||||
liveBytes.addAndGet(-bytes);
|
||||
liveBytesUpdater.addAndGet(this, -bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getLiveSize() {
|
||||
return liveBytes.get();
|
||||
return liveBytesUpdater.get(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -91,15 +91,45 @@ import static org.apache.activemq.artemis.core.journal.impl.Reclaimer.scan;
|
|||
* <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
|
||||
*/
|
||||
public class JournalImpl extends JournalBase implements TestableJournal, JournalRecordProvider {
|
||||
private static final Logger logger = Logger.getLogger(JournalImpl.class);
|
||||
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
/**
|
||||
* this is a factor where when you have more than UPDATE_FACTOR updates for every ADD.
|
||||
*
|
||||
* When this happens we should issue a compacting event.
|
||||
*
|
||||
* I don't foresee users needing to configure this value. However if this ever happens we would have a system property aligned for this.
|
||||
*
|
||||
* With that being said, if you needed this, please raise an issue on why you needed to use this, so we may eventually add it to broker.xml when a real
|
||||
* use case would determine the configuration exposed in there.
|
||||
*
|
||||
* To update this value, define a System Property org.apache.activemq.artemis.core.journal.impl.JournalImpl.UPDATE_FACTOR=YOUR VALUE
|
||||
*
|
||||
* */
|
||||
public static final double UPDATE_FACTOR;
|
||||
|
||||
static {
|
||||
String UPDATE_FACTOR_STR = System.getProperty(JournalImpl.class.getName() + ".UPDATE_FACTOR");
|
||||
double value;
|
||||
try {
|
||||
if (UPDATE_FACTOR_STR == null) {
|
||||
value = 100;
|
||||
} else {
|
||||
value = Double.parseDouble(UPDATE_FACTOR_STR);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
value = 100;
|
||||
}
|
||||
|
||||
UPDATE_FACTOR = value;
|
||||
}
|
||||
|
||||
public static final int FORMAT_VERSION = 2;
|
||||
|
||||
private static final int[] COMPATIBLE_VERSIONS = new int[]{1};
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
private static final Logger logger = Logger.getLogger(JournalImpl.class);
|
||||
|
||||
// The sizes of primitive types
|
||||
|
||||
|
@ -2312,10 +2342,31 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
|
||||
long totalLiveSize = 0;
|
||||
|
||||
long updateCount = 0, addRecord = 0;
|
||||
|
||||
for (JournalFile file : dataFiles) {
|
||||
totalLiveSize += file.getLiveSize();
|
||||
updateCount += file.getPosCount();
|
||||
addRecord += file.getAddRecord();
|
||||
}
|
||||
|
||||
|
||||
if (dataFiles.length > compactMinFiles && addRecord > 0 && updateCount > 0) {
|
||||
double updateFactor = updateCount / addRecord;
|
||||
|
||||
if (updateFactor > UPDATE_FACTOR) { // this means every add records with at least 10 records
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("There are " + addRecord + " records, with " + updateCount + " towards them. UpdateCound / AddCount = " + updateFactor + ", being greater than " + UPDATE_FACTOR + " meaning we have to schedule compacting");
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("There are " + addRecord + " records, with " + updateCount + " towards them. UpdateCound / AddCount = " + updateFactor + ", which is lower than " + UPDATE_FACTOR + " meaning we are ok to leave these records");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
long totalBytes = dataFiles.length * (long) fileSize;
|
||||
|
||||
long compactMargin = (long) (totalBytes * compactPercentage);
|
||||
|
|
|
@ -42,6 +42,8 @@ public class JournalRecord {
|
|||
addFile.incPosCount();
|
||||
|
||||
addFile.addSize(size);
|
||||
|
||||
addFile.incAddRecord();
|
||||
}
|
||||
|
||||
void addUpdateFile(final JournalFile updateFile, final int bytes) {
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.journal.impl;
|
||||
|
||||
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class VerifyUpdateFactorSystemProperty {
|
||||
|
||||
public static void main(String[] arg) {
|
||||
|
||||
try {
|
||||
Assert.assertEquals(33.0, JournalImpl.UPDATE_FACTOR, 0);
|
||||
System.exit(0);
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
System.exit(100);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateUpdateRecordProperty() throws Exception {
|
||||
Process process = SpawnedVMSupport.spawnVM(VerifyUpdateFactorSystemProperty.class.getName(), new String[]{"-D" + JournalImpl.class.getName() + ".UPDATE_FACTOR=33.0"}, new String[]{});
|
||||
Assert.assertEquals(0, process.waitFor());
|
||||
}
|
||||
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
# Additional logger names to configure (root logger is always configured)
|
||||
# Root logger option
|
||||
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests,org.apache.activemq.cli.test,org.apache.activemq.audit,org.apache.activemq.audit.message
|
||||
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.smoke,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests,org.apache.activemq.cli.test,org.apache.activemq.audit,org.apache.activemq.audit.message
|
||||
|
||||
# Root logger level
|
||||
logger.level=INFO
|
||||
|
@ -33,6 +33,7 @@ logger.org.apache.activemq.artemis.tests.integration.level=DEBUG
|
|||
logger.org.apache.activemq.artemis.tests.level=DEBUG
|
||||
logger.org.apache.activemq.artemis.tests.unit.level=DEBUG
|
||||
logger.org.apache.activemq.artemis.jms.tests.level=DEBUG
|
||||
logger.org.apache.activemq.artemis.tests.smoke.level=DEBUG
|
||||
|
||||
|
||||
# Root logger handlers
|
||||
|
|
|
@ -765,6 +765,24 @@
|
|||
<configuration>${basedir}/target/classes/servers/MaxQueueResourceTest</configuration>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
||||
<!-- used on InfiniteRedeliverySmokeTest -->
|
||||
<execution>
|
||||
<phase>test-compile</phase>
|
||||
<id>createBrokerInfiniteRedelivery</id>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<allowAnonymous>true</allowAnonymous>
|
||||
<user>A</user>
|
||||
<password>A</password>
|
||||
<noWeb>true</noWeb>
|
||||
<instance>${basedir}/target/infinite-redelivery</instance>
|
||||
<configuration>${basedir}/target/classes/servers/infinite-redelivery</configuration>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
||||
</executions>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/largemessages</large-messages-directory>
|
||||
|
||||
<journal-pool-files>10</journal-pool-files>
|
||||
<journal-min-files>2</journal-min-files>
|
||||
<journal-compact-min-files>3</journal-compact-min-files>
|
||||
<persist-delivery-count-before-delivery>false</persist-delivery-count-before-delivery>
|
||||
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<!-- Acceptors -->
|
||||
<acceptors>
|
||||
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
|
||||
</acceptors>
|
||||
|
||||
<!-- Other config -->
|
||||
|
||||
<security-settings>
|
||||
<!--security for example queue-->
|
||||
<security-setting match="#">
|
||||
<permission roles="guest" type="createDurableQueue"/>
|
||||
<permission roles="guest" type="deleteDurableQueue"/>
|
||||
<permission roles="guest" type="createNonDurableQueue"/>
|
||||
<permission roles="guest" type="deleteNonDurableQueue"/>
|
||||
<permission roles="guest" type="consume"/>
|
||||
<permission roles="guest" type="send"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!--override the max-delivery-attempts and dead letter address for the example queue-->
|
||||
<address-setting match="testQueue">
|
||||
<dead-letter-address>deadLetterQueue</dead-letter-address>
|
||||
<max-delivery-attempts>-1</max-delivery-attempts>
|
||||
<redelivery-delay>1</redelivery-delay>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="deadLetterQueue">
|
||||
<anycast>
|
||||
<queue name="deadLetterQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="testQueue">
|
||||
<anycast>
|
||||
<queue name="testQueue"/>
|
||||
</anycast>
|
||||
</address>
|
||||
</addresses>
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,89 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# Additional logger names to configure (root logger is always configured)
|
||||
# Root logger option
|
||||
loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource,org.apache.activemq.artemis.core.journal.impl.JournalImpl
|
||||
|
||||
# Root logger level
|
||||
logger.level=INFO
|
||||
# ActiveMQ Artemis logger levels
|
||||
logger.org.apache.activemq.artemis.core.server.level=INFO
|
||||
logger.org.apache.activemq.artemis.journal.level=INFO
|
||||
logger.org.apache.activemq.artemis.utils.level=INFO
|
||||
|
||||
# DEBUG on this level will give you a lot of information on when compacting is happening
|
||||
logger.org.apache.activemq.artemis.core.journal.impl.JournalImpl.level=DEBUG
|
||||
|
||||
# if you have issues with CriticalAnalyzer, setting this as TRACE would give you extra troubleshooting information.
|
||||
# but do not use it regularly as it would incur in some extra CPU usage for this diagnostic.
|
||||
logger.org.apache.activemq.artemis.utils.critical.level=INFO
|
||||
|
||||
logger.org.apache.activemq.artemis.jms.level=INFO
|
||||
logger.org.apache.activemq.artemis.integration.bootstrap.level=INFO
|
||||
logger.org.eclipse.jetty.level=WARN
|
||||
# Root logger handlers
|
||||
logger.handlers=FILE,CONSOLE
|
||||
|
||||
# to enable audit change the level to INFO
|
||||
logger.org.apache.activemq.audit.base.level=ERROR
|
||||
logger.org.apache.activemq.audit.base.handlers=AUDIT_FILE
|
||||
logger.org.apache.activemq.audit.base.useParentHandlers=false
|
||||
|
||||
logger.org.apache.activemq.audit.resource.level=ERROR
|
||||
logger.org.apache.activemq.audit.resource.handlers=AUDIT_FILE
|
||||
logger.org.apache.activemq.audit.resource.useParentHandlers=false
|
||||
|
||||
logger.org.apache.activemq.audit.message.level=ERROR
|
||||
logger.org.apache.activemq.audit.message.handlers=AUDIT_FILE
|
||||
logger.org.apache.activemq.audit.message.useParentHandlers=false
|
||||
|
||||
# Console handler configuration
|
||||
handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
|
||||
handler.CONSOLE.properties=autoFlush
|
||||
handler.CONSOLE.level=DEBUG
|
||||
handler.CONSOLE.autoFlush=true
|
||||
handler.CONSOLE.formatter=PATTERN
|
||||
|
||||
# File handler configuration
|
||||
handler.FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
|
||||
handler.FILE.level=DEBUG
|
||||
handler.FILE.properties=suffix,append,autoFlush,fileName
|
||||
handler.FILE.suffix=.yyyy-MM-dd
|
||||
handler.FILE.append=true
|
||||
handler.FILE.autoFlush=true
|
||||
handler.FILE.fileName=${artemis.instance}/log/artemis.log
|
||||
handler.FILE.formatter=PATTERN
|
||||
|
||||
# Formatter pattern configuration
|
||||
formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
|
||||
formatter.PATTERN.properties=pattern
|
||||
formatter.PATTERN.pattern=%d %-5p [%c] %s%E%n
|
||||
|
||||
#Audit logger
|
||||
handler.AUDIT_FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
|
||||
handler.AUDIT_FILE.level=INFO
|
||||
handler.AUDIT_FILE.properties=suffix,append,autoFlush,fileName
|
||||
handler.AUDIT_FILE.suffix=.yyyy-MM-dd
|
||||
handler.AUDIT_FILE.append=true
|
||||
handler.AUDIT_FILE.autoFlush=true
|
||||
handler.AUDIT_FILE.fileName=${artemis.instance}/log/audit.log
|
||||
handler.AUDIT_FILE.formatter=AUDIT_PATTERN
|
||||
|
||||
formatter.AUDIT_PATTERN=org.jboss.logmanager.formatters.PatternFormatter
|
||||
formatter.AUDIT_PATTERN.properties=pattern
|
||||
formatter.AUDIT_PATTERN.pattern=%d [AUDIT](%t) %s%E%n
|
|
@ -0,0 +1,138 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.smoke.infinite;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class InfiniteRedeliverySmokeTest extends SmokeTestBase {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(InfiniteRedeliverySmokeTest.class);
|
||||
|
||||
public static final String SERVER_NAME_0 = "infinite-redelivery";
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
cleanupData(SERVER_NAME_0);
|
||||
startServer(SERVER_NAME_0, 0, 30000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateRedeliveries() throws Exception {
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
||||
Queue queue = session.createQueue("testQueue");
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
||||
|
||||
TextMessage message = session.createTextMessage("this is a test");
|
||||
for (int i = 0; i < 5000; i++) {
|
||||
producer.send(message);
|
||||
}
|
||||
session.commit();
|
||||
|
||||
connection.start();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
File journalLocation = new File(getServerLocation(SERVER_NAME_0) + "/data/journal");
|
||||
SequentialFileFactory fileFactory = new NIOSequentialFileFactory(journalLocation, 1);
|
||||
|
||||
for (int i = 0; i < 500; i++) {
|
||||
if (i % 10 == 0) logger.debug("Redelivery " + i);
|
||||
for (int j = 0; j < 5000; j++) {
|
||||
Assert.assertNotNull(consumer.receive(5000));
|
||||
}
|
||||
session.rollback();
|
||||
|
||||
int numberOfFiles = fileFactory.listFiles("amq").size();
|
||||
|
||||
// it should be actually 10, However if a future rule changes it to allow removing files I'm ok with that
|
||||
Assert.assertTrue("there are not enough files on journal", numberOfFiles >= 2);
|
||||
// it should be max 10 actually, I'm just leaving some space for future changes,
|
||||
// as the real test I'm after here is the broker should clean itself up
|
||||
Wait.assertTrue("there are too many files created", () -> fileFactory.listFiles("amq").size() <= 20);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateJournalOnRollbackSend() throws Exception {
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
||||
Queue queue = session.createQueue("testQueue");
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
||||
|
||||
|
||||
File journalLocation = new File(getServerLocation(SERVER_NAME_0) + "/data/journal");
|
||||
SequentialFileFactory fileFactory = new NIOSequentialFileFactory(journalLocation, 1);
|
||||
TextMessage message = session.createTextMessage("This is a test");
|
||||
producer.send(message); // we will always have one message behind
|
||||
connection.start();
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
for (int i = 0; i < 500; i++) {
|
||||
if (i % 10 == 0) logger.debug("Rollback send " + i);
|
||||
for (int j = 0; j < 5000; j++) {
|
||||
producer.send(message);
|
||||
}
|
||||
if (i % 100 == 0) {
|
||||
session.commit();
|
||||
for (int c = 0; c < 5000; c++) {
|
||||
Assert.assertNotNull(consumer.receive(5000));
|
||||
}
|
||||
session.commit();
|
||||
Assert.assertNotNull(consumer.receive(5000)); // there's one message behind
|
||||
session.rollback(); // we will keep the one message behind
|
||||
} else {
|
||||
session.rollback();
|
||||
}
|
||||
int numberOfFiles = fileFactory.listFiles("amq").size();
|
||||
// it should be actually 10, However if a future rule changes it to allow removing files I'm ok with that
|
||||
Assert.assertTrue("there are not enough files on journal", numberOfFiles >= 2);
|
||||
// it should be max 10 actually, I'm just leaving some space for future changes,
|
||||
// as the real test I'm after here is the broker should clean itself up
|
||||
Wait.assertTrue(() -> fileFactory.listFiles("amq").size() <= 20);
|
||||
Assert.assertTrue("there are too many files created", numberOfFiles <= 20);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -744,6 +744,16 @@ public class ReclaimerTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incAddRecord() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getAddRecord() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incNegCount(final JournalFile file) {
|
||||
incNegCount(file, 1);
|
||||
|
|
Loading…
Reference in New Issue