This commit is contained in:
Clebert Suconic 2020-03-16 16:44:01 -04:00
commit 327af99373
7 changed files with 370 additions and 41 deletions

View File

@ -446,21 +446,17 @@ public class TypedProperties {
}
public synchronized void decode(final ByteBuf buffer,
final TypedPropertiesDecoderPools keyValuePools,
boolean replaceExisting) {
final TypedPropertiesDecoderPools keyValuePools) {
byte b = buffer.readByte();
if (b == DataConstants.NULL) {
if (replaceExisting) {
properties = null;
size = 0;
}
properties = null;
size = 0;
} else {
int numHeaders = buffer.readInt();
if (replaceExisting || properties == null) {
//optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
properties = new HashMap<>(numHeaders, 1.0f);
}
size = properties.size();
//optimize the case of no collisions to avoid any resize (it doubles the map size!!!) when load factor is reached
properties = new HashMap<>(numHeaders, 1.0f);
size = 0;
for (int i = 0; i < numHeaders; i++) {
final SimpleString key = SimpleString.readSimpleString(buffer, keyValuePools == null ? null : keyValuePools.getPropertyKeysPool());
@ -533,10 +529,6 @@ public class TypedProperties {
}
}
public synchronized void decode(final ByteBuf buffer, final TypedPropertiesDecoderPools keyValuePools) {
decode(buffer, keyValuePools, true);
}
public void decode(final ByteBuf buffer) {
decode(buffer, null);
}

View File

@ -32,6 +32,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static org.apache.activemq.artemis.utils.collections.TypedProperties.searchProperty;
import static org.hamcrest.Matchers.greaterThan;
public class TypedPropertiesTest {
@ -96,8 +97,12 @@ public class TypedPropertiesTest {
Assert.assertTrue(props.containsProperty(key));
Assert.assertNotNull(props.getProperty(key));
Assert.assertThat(props.getEncodeSize(), greaterThan(0));
props.clear();
Assert.assertEquals(1, props.getEncodeSize());
Assert.assertFalse(props.containsProperty(key));
Assert.assertNull(props.getProperty(key));
}

View File

@ -17,8 +17,11 @@
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -72,25 +75,36 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
}
@Override
public Message decode(ActiveMQBuffer buffer, Message record, CoreMessageObjectPools pool) {
AMQPMessage message = (AMQPMessage) super.decode(buffer, record, pool);
int size = buffer.readInt();
if (size != 0) {
// message::setAddress could have populated extra properties
// hence, we can safely replace the value on the properties
// if it has been encoded differently in the rest of the buffer
TypedProperties existingExtraProperties = message.getExtraProperties();
TypedProperties extraProperties = existingExtraProperties;
if (existingExtraProperties == null) {
extraProperties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
}
extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null, existingExtraProperties == null);
if (extraProperties != existingExtraProperties) {
message.setExtraProperties(extraProperties);
}
public Message decode(ActiveMQBuffer buffer, Message ignore, CoreMessageObjectPools pool) {
// IMPORTANT:
// This is a sightly modified copy of the AMQPMessagePersister::decode body
// to save extraProperties to be created twice: this would kill GC during journal loading
long id = buffer.readLong();
long format = buffer.readLong();
// this instance is being used only if there are no extraProperties or just for debugging purposes:
// on journal loading pool shouldn't be null so it shouldn't create any garbage.
final SimpleString address;
if (pool == null) {
address = buffer.readNullableSimpleString();
} else {
address = SimpleString.readNullableSimpleString(buffer.byteBuf(), pool.getAddressDecoderPool());
}
return message;
AMQPStandardMessage record = new AMQPStandardMessage(format);
record.reloadPersistence(buffer, pool);
record.setMessageID(id);
// END of AMQPMessagePersister::decode body copy
int size = buffer.readInt();
if (size != 0) {
final TypedProperties extraProperties = record.createExtraProperties();
extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null);
assert Objects.equals(address, extraProperties.getSimpleStringProperty(AMQPMessage.ADDRESS_PROPERTY)) :
"AMQPMessage address and extraProperties address should match";
} else if (address != null) {
// this shouldn't really happen: this code path has been preserved
// because of the behaviour before "ARTEMIS-2617 Improve AMQP Journal loading"
record.setAddress(address);
}
return record;
}
}

View File

@ -17,7 +17,10 @@
package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@ -58,7 +61,7 @@ public class PageReaderTest extends ActiveMQTestBase {
int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, i - 1));
PagePosition pagePosition = startPosition.nextPagePostion();
assertEquals(offsets[i], pagePosition.getFileOffset());
assertEquals("Message " + i + " has wrong offset", offsets[i], pagePosition.getFileOffset());
pagedMessage = pageReader.getMessage(pagePosition);
}
assertNotNull(pagedMessage);
@ -69,6 +72,30 @@ public class PageReaderTest extends ActiveMQTestBase {
pageReader.close();
}
@Test
public void testShortPageReadMessage() throws Exception {
recreateDirectory(getTestDir());
int num = 2;
int[] offsets = createPage(num);
PageReader pageReader = getPageReader();
PagedMessage[] pagedMessages = pageReader.getMessages();
assertEquals(pagedMessages.length, num);
PagePosition pagePosition = new PagePositionImpl(10, 0);
PagedMessage firstPagedMessage = pageReader.getMessage(pagePosition);
assertEquals("Message 0 has a wrong encodeSize", pagedMessages[0].getEncodeSize(), firstPagedMessage.getEncodeSize());
int nextFileOffset = offsets[0] + firstPagedMessage.getEncodeSize() + Page.SIZE_RECORD;
PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, 0));
PagePosition nextPagePosition = startPosition.nextPagePostion();
assertEquals("Message 1 has a wrong offset", offsets[1], nextPagePosition.getFileOffset());
PagedMessage pagedMessage = pageReader.getMessage(nextPagePosition);
assertNotNull(pagedMessage);
assertEquals(pagedMessage.getMessage().getMessageID(), 1);
assertEquals(pagedMessages[1].getMessage().getMessageID(), 1);
pageReader.close();
}
@Test
public void testPageReadMessageBeyondPage() throws Exception {
recreateDirectory(getTestDir());
@ -113,15 +140,12 @@ public class PageReaderTest extends ActiveMQTestBase {
Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
page.open();
SimpleString simpleDestination = new SimpleString("Test");
final int msgSize = 100;
final byte[] content = new byte[msgSize];
Arrays.fill(content, (byte) 'b');
int[] offsets = new int[num];
for (int i = 0; i < num; i++) {
ICoreMessage msg = new CoreMessage().setMessageID(i).initBuffer(1024);
for (int j = 0; j < 100; j++) {
msg.getBodyBuffer().writeByte((byte) 'b');
}
msg.setAddress(simpleDestination);
Message msg = createMessage(simpleDestination, i, content);
offsets[i] = (int)page.getFile().position();
page.write(new PagedMessageImpl(msg, new long[0]));
@ -131,6 +155,17 @@ public class PageReaderTest extends ActiveMQTestBase {
return offsets;
}
protected Message createMessage(SimpleString address, int msgId, byte[] content) {
ICoreMessage msg = new CoreMessage().setMessageID(msgId).initBuffer(1024);
for (byte b : content) {
msg.getBodyBuffer().writeByte(b);
}
msg.setAddress(address);
return msg;
}
private PageReader getPageReader() throws Exception {
SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
SequentialFile file = factory.createSequentialFile("00010.page");

View File

@ -287,6 +287,26 @@
</args>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-paging</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<configuration>${basedir}/target/classes/servers/paging</configuration>
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<instance>${basedir}/target/paging</instance>
<args>
<!-- this is needed to run the server remotely -->
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>

View File

@ -0,0 +1,186 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<message-expiry-scan-period>1000</message-expiry-scan-period>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size>
-->
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- useKQueue means: it will use Netty kqueue if you are on a system (MacOS) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;useKQueue=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true;useKQueue=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true;useKQueue=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>100000</max-size-bytes>
<page-size-bytes>10000</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.paging;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.messages.Consumer;
import org.apache.activemq.artemis.cli.commands.messages.Producer;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SmokePagingTest extends SmokeTestBase {
public static final String SERVER_NAME_0 = "paging";
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
disableCheckThread();
startServer(SERVER_NAME_0, 0, 30000);
}
@Test
public void testAMQPOnCLI() throws Exception {
String protocol = "amqp";
int NUMBER_OF_MESSAGES = 5000;
internalReceive(protocol, NUMBER_OF_MESSAGES);
}
@Test
public void testCoreOnCLI() throws Exception {
String protocol = "core";
int NUMBER_OF_MESSAGES = 5000;
internalReceive(protocol, NUMBER_OF_MESSAGES);
}
private void internalReceive(String protocol, int NUMBER_OF_MESSAGES) throws Exception {
Producer producer = (Producer)new Producer().setMessageSize(1000).setMessageCount(NUMBER_OF_MESSAGES).setTxBatchSize(1000);
producer.setProtocol(protocol);
producer.setSilentInput(true);
producer.execute(new ActionContext());
Consumer consumer = new Consumer();
consumer.setMessageCount(NUMBER_OF_MESSAGES);
consumer.setProtocol(protocol);
consumer.setSilentInput(true);
consumer.setReceiveTimeout(2000);
consumer.setBreakOnNull(true);
int consumed = (int)consumer.execute(new ActionContext());
Assert.assertEquals(NUMBER_OF_MESSAGES, consumed);
}
}