Fixing store size calculation on KahaUpdateMessageCommand processing so
that the size won't increase inadvertently if the existing location of
the command in the journal is the same as the new location

(cherry picked from commit a5050a8bc5)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-07-14 09:21:23 -04:00
parent 70e2b92f1b
commit f1642b4244
2 changed files with 155 additions and 3 deletions

View File

@ -1452,10 +1452,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
); );
sd.locationIndex.put(tx, location, id); sd.locationIndex.put(tx, location, id);
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
// on first update previous is original location, on recovery/replay it may be the updated location
if(previousKeys != null && !previousKeys.location.equals(location)) { if (previousKeys != null) {
sd.locationIndex.remove(tx, previousKeys.location); //Remove the existing from the size
decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
// on first update previous is original location, on recovery/replay it may be the updated location
if(!previousKeys.location.equals(location)) {
sd.locationIndex.remove(tx, previousKeys.location);
}
} }
metadata.lastUpdate = location; metadata.lastUpdate = location;
} else { } else {

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageDatabaseSizeTest {
private static final Logger LOG = LoggerFactory.getLogger(MessageDatabaseSizeTest.class);
@Rule
public TemporaryFolder dataDir = new TemporaryFolder(new File("target"));
private final String payload = new String(new byte[1024]);
private BrokerService broker = null;
private final ActiveMQQueue destination = new ActiveMQQueue("Test");
private KahaDBPersistenceAdapter adapter;
protected void startBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
broker.setUseJmx(true);
broker.setDataDirectory(dataDir.getRoot().getAbsolutePath());
adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
broker.start();
LOG.info("Starting broker..");
}
@Before
public void start() throws Exception {
startBroker();
}
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
/**
* Test that when only updating the index and not rewriting the message to the journal
* that the size doesn't change
*
* This was broken before AMQ-6356
*/
@Test
public void testUpdateMessageSameLocation() throws Exception {
final KahaDBStore store = adapter.getStore();
MessageId messageId = new MessageId("111:222:333");
ActiveMQTextMessage textMessage = getMessage(new MessageId("111:222:333"));
//Add a single message and update once so we can compare the size consistently
MessageStore messageStore = store.createQueueMessageStore(destination);
messageStore.addMessage(broker.getAdminConnectionContext(), textMessage);
messageStore.updateMessage(textMessage);
Location location = findMessageLocation(messageId.toString(), store.convert(destination));
long existingSize = messageStore.getMessageSize();
//Process the update command for the index and verify the size doesn't change
KahaUpdateMessageCommand updateMessageCommand = (KahaUpdateMessageCommand) store.load(location);
store.process(updateMessageCommand, location);
assertEquals(existingSize, messageStore.getMessageSize());
}
/**
* Test that when updating an existing message to a different location in the
* journal that the index size doesn't change
*/
@Test
public void testUpdateMessageDifferentLocation() throws Exception {
final KahaDBStore store = adapter.getStore();
ActiveMQTextMessage textMessage = getMessage(new MessageId("111:222:333"));
//Add a single message and update once so we can compare the size consistently
MessageStore messageStore = store.createQueueMessageStore(destination);
messageStore.addMessage(broker.getAdminConnectionContext(), textMessage);
messageStore.updateMessage(textMessage);
//Update again and make sure the size is the same
long existingSize = messageStore.getMessageSize();
messageStore.updateMessage(textMessage);
assertEquals(existingSize, messageStore.getMessageSize());
}
private ActiveMQTextMessage getMessage(final MessageId messageId) throws Exception {
ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
textMessage.setMessageId(messageId);
textMessage.setText(payload);
return textMessage;
}
private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
final KahaDBStore store = adapter.getStore();
return store.pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
@Override
public Location execute(Transaction tx) throws IOException {
StoredDestination sd = store.getStoredDestination(destination, tx);
Long sequence = sd.messageIdIndex.get(tx, key);
if (sequence == null) {
return null;
}
return sd.orderIndex.get(tx, sequence).location;
}
});
}
}