Don't increase the cache size for repeated pull commands for the same
destination + consumer combo since we only keep one instance in the map
at any given time.
This commit is contained in:
Timothy Bish 2014-01-17 12:29:55 -05:00
parent 0918430dc2
commit e7703f70e0
3 changed files with 192 additions and 32 deletions

View File

@ -35,10 +35,14 @@ public class MessagePull extends BaseCommand {
private MessageId messageId;
private String correlationId;
private transient boolean tracked = false;
@Override
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@Override
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processMessagePull(this);
}
@ -112,4 +116,12 @@ public class MessagePull extends BaseCommand {
public void setMessageId(MessageId messageId) {
this.messageId = messageId;
}
public void setTracked(boolean tracked) {
this.tracked = tracked;
}
public boolean isTracked() {
return this.tracked;
}
}

View File

@ -20,8 +20,8 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Vector;
import java.util.Map.Entry;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.TransactionRolledBackException;
@ -70,8 +70,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
private boolean trackMessages = true;
private boolean trackTransactionProducers = true;
private int maxCacheSize = 128 * 1024;
private int currentCacheSize;
private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
private long currentCacheSize; // use long to prevent overflow for folks who set high max.
private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
@Override
protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
boolean result = currentCacheSize > maxCacheSize;
if (result) {
@ -95,6 +97,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
this.info = info;
}
@Override
public void onResponse(Command response) {
ConnectionId connectionId = info.getConnectionId();
ConnectionState cs = connectionStates.get(connectionId);
@ -105,11 +108,11 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
public PrepareReadonlyTransactionAction(TransactionInfo info) {
super(info);
}
@Override
public void onResponse(Command command) {
if (command instanceof IntegerResponse) {
IntegerResponse response = (IntegerResponse) command;
@ -122,11 +125,16 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
/**
*
* Entry point for all tracked commands in the tracker. Commands should be tracked before
* there is an attempt to send them on the wire. Upon a successful send of a command it is
* necessary to call the trackBack method to complete the tracking of the given command.
*
* @param command
* The command that is to be tracked by this tracker.
*
* @return null if the command is not state tracked.
* @throws IOException
*
* @throws IOException if an error occurs during setup of the tracking operation.
*/
public Tracked track(Command command) throws IOException {
try {
@ -138,6 +146,14 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
}
/**
* Completes the two phase tracking operation for a command that is sent on the wire. Once
* the command is sent successfully to complete the tracking operation or otherwise update
* the state of the tracker.
*
* @param command
* The command that was previously provided to the track method.
*/
public void trackBack(Command command) {
if (command != null) {
if (trackMessages && command.isMessage()) {
@ -146,8 +162,12 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
currentCacheSize = currentCacheSize + message.getSize();
}
} else if (command instanceof MessagePull) {
// just needs to be a rough estimate of size, ~4 identifiers
currentCacheSize += MESSAGE_PULL_SIZE;
// We only track one MessagePull per consumer so only add to cache size
// when the command has been marked as tracked.
if (((MessagePull)command).isTracked()) {
// just needs to be a rough estimate of size, ~4 identifiers
currentCacheSize += MESSAGE_PULL_SIZE;
}
}
}
}
@ -171,8 +191,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
restoreTransactions(transport, connectionState);
}
}
//now flush messages
for (Command msg:messageCache.values()) {
// now flush messages and MessagePull commands.
for (Command msg : messageCache.values()) {
if (LOG.isDebugEnabled()) {
LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
}
@ -319,6 +340,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
}
}
@Override
public Response processAddDestination(DestinationInfo info) {
if (info != null) {
ConnectionState cs = connectionStates.get(info.getConnectionId());
@ -329,6 +351,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processRemoveDestination(DestinationInfo info) {
if (info != null) {
ConnectionState cs = connectionStates.get(info.getConnectionId());
@ -339,6 +362,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processAddProducer(ProducerInfo info) {
if (info != null && info.getProducerId() != null) {
SessionId sessionId = info.getProducerId().getParentId();
@ -358,6 +382,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processRemoveProducer(ProducerId id) {
if (id != null) {
SessionId sessionId = id.getParentId();
@ -377,6 +402,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processAddConsumer(ConsumerInfo info) {
if (info != null) {
SessionId sessionId = info.getConsumerId().getParentId();
@ -396,6 +422,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
if (id != null) {
SessionId sessionId = id.getParentId();
@ -416,6 +443,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processAddSession(SessionInfo info) {
if (info != null) {
ConnectionId connectionId = info.getSessionId().getParentId();
@ -429,6 +457,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
if (id != null) {
ConnectionId connectionId = id.getParentId();
@ -442,6 +471,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processAddConnection(ConnectionInfo info) {
if (info != null) {
connectionStates.put(info.getConnectionId(), new ConnectionState(info));
@ -449,6 +479,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
if (id != null) {
connectionStates.remove(id);
@ -456,6 +487,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return TRACKED_RESPONSE_MARKER;
}
@Override
public Response processMessage(Message send) throws Exception {
if (send != null) {
if (trackTransactions && send.getTransactionId() != null) {
@ -486,6 +518,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
@Override
public Response processBeginTransaction(TransactionInfo info) {
if (trackTransactions && info != null && info.getTransactionId() != null) {
ConnectionId connectionId = info.getConnectionId();
@ -502,6 +535,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
@Override
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@ -519,6 +553,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
@Override
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@ -536,6 +571,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
@Override
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@ -553,6 +589,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
@Override
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@ -570,6 +607,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
return null;
}
@Override
public Response processEndTransaction(TransactionInfo info) throws Exception {
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
@ -592,7 +630,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
if (pull != null) {
// leave a single instance in the cache
final String id = pull.getDestination() + "::" + pull.getConsumerId();
messageCache.put(id.intern(), pull);
if (messageCache.put(id.intern(), pull) == null) {
// Only marked as tracked if this is the first request we've seen.
pull.setTracked(true);
}
}
return null;
}
@ -661,6 +702,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
this.maxCacheSize = maxCacheSize;
}
/**
* @return the current cache size for the Message and MessagePull Command cache.
*/
public long getCurrentCacheSize() {
return this.currentCacheSize;
}
public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
ConnectionState connectionState = connectionStates.get(connectionId);
if (connectionState != null) {

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.state;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.UUID;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.SessionId;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ConnectionStateTrackerTest {
private final ActiveMQQueue queue = new ActiveMQQueue("Test");
private ConnectionId testConnectionId;
private SessionId testSessionId;
private int connectionId = 0;
private int sessionId = 0;
private int consumerId = 0;
@Before
public void setUp() throws Exception {
testConnectionId = createConnectionId();
testSessionId = createSessionId(testConnectionId);
}
@After
public void tearDown() throws Exception {
}
@Test
public void testCacheSizeWithMessagePulls() throws IOException {
final ConsumerId consumer1 = createConsumerId(testSessionId);
ConnectionStateTracker tracker = new ConnectionStateTracker();
assertEquals(0, tracker.getCurrentCacheSize());
MessagePull pullCommand = createPullCommand(consumer1);
tracker.track(pullCommand);
assertEquals(0, tracker.getCurrentCacheSize());
tracker.trackBack(pullCommand);
long currentSize = tracker.getCurrentCacheSize();
assertTrue(currentSize > 0);
pullCommand = createPullCommand(consumer1);
tracker.track(pullCommand);
tracker.trackBack(pullCommand);
assertEquals(currentSize, tracker.getCurrentCacheSize());
}
private MessagePull createPullCommand(ConsumerId id) {
MessagePull pullCommand = new MessagePull();
pullCommand.setDestination(queue);
pullCommand.setConsumerId(id);
return pullCommand;
}
private ConnectionId createConnectionId() {
ConnectionId id = new ConnectionId();
id.setValue(UUID.randomUUID() + ":" + connectionId++);
return id;
}
private SessionId createSessionId(ConnectionId connectionId) {
return new SessionId(connectionId, sessionId++);
}
private ConsumerId createConsumerId(SessionId sessionId) {
return new ConsumerId(sessionId, consumerId++);
}
}