mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3805 - duplicate dispatch to durable sub with concurrent send transaction commit and activate. fixed up the use of audit through an activate/deactivate such that duplicate dispatch is suppressed at source in this case
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1325722 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0be51cbfe2
commit
53b29a206b
|
@ -163,7 +163,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deactivate(boolean keepDurableSubsActive) throws Exception {
|
public void deactivate(boolean keepDurableSubsActive) throws Exception {
|
||||||
LOG.debug("Deactivating " + this);
|
LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this);
|
||||||
active.set(false);
|
active.set(false);
|
||||||
offlineTimestamp.set(System.currentTimeMillis());
|
offlineTimestamp.set(System.currentTimeMillis());
|
||||||
this.usageManager.getMemoryUsage().removeUsageListener(this);
|
this.usageManager.getMemoryUsage().removeUsageListener(this);
|
||||||
|
@ -187,9 +187,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
||||||
} else {
|
} else {
|
||||||
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
|
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
|
||||||
}
|
}
|
||||||
if (keepDurableSubsActive&& pending.isTransient()) {
|
if (keepDurableSubsActive && pending.isTransient()) {
|
||||||
synchronized (pending) {
|
synchronized (pending) {
|
||||||
pending.addMessageFirst(node);
|
pending.addMessageFirst(node);
|
||||||
|
pending.rollback(node.getMessageId());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
node.decrementReferenceCount();
|
node.decrementReferenceCount();
|
||||||
|
|
|
@ -64,7 +64,6 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
|
||||||
|
|
||||||
public synchronized void stop() throws Exception {
|
public synchronized void stop() throws Exception {
|
||||||
started=false;
|
started=false;
|
||||||
audit=null;
|
|
||||||
gc();
|
gc();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -243,6 +243,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
batchList.clear();
|
batchList.clear();
|
||||||
clearIterator(false);
|
clearIterator(false);
|
||||||
batchResetNeeded = true;
|
batchResetNeeded = true;
|
||||||
|
// wonder do we need to determine size here, it may change before restart
|
||||||
resetSize();
|
resetSize();
|
||||||
setCacheEnabled(false);
|
setCacheEnabled(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,16 +93,15 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
||||||
if (isStarted()) {
|
if (isStarted()) {
|
||||||
if (subscription.isKeepDurableSubsActive()) {
|
if (subscription.isKeepDurableSubsActive()) {
|
||||||
super.gc();
|
super.gc();
|
||||||
super.getMessageAudit().clear();
|
|
||||||
for (PendingMessageCursor tsp : storePrefetches) {
|
for (PendingMessageCursor tsp : storePrefetches) {
|
||||||
tsp.gc();
|
tsp.gc();
|
||||||
tsp.getMessageAudit().clear();
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
super.stop();
|
super.stop();
|
||||||
for (PendingMessageCursor tsp : storePrefetches) {
|
for (PendingMessageCursor tsp : storePrefetches) {
|
||||||
tsp.stop();
|
tsp.stop();
|
||||||
}
|
}
|
||||||
|
getMessageAudit().clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,6 +119,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "TopicStorePrefetch(" + clientId + "," + subscriberName + ")" + super.toString();
|
return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,942 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Vector;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||||
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
|
import org.apache.activemq.util.ThreadTracker;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class DurableSubProcessConcurrentCommitActivateNoDuplicateTest {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.class);
|
||||||
|
public static final long RUNTIME = 5 * 60 * 1000;
|
||||||
|
|
||||||
|
public static final int SERVER_SLEEP = 500; // max
|
||||||
|
public static final int CARGO_SIZE = 600; // max
|
||||||
|
|
||||||
|
public static final int MAX_CLIENTS = 2;
|
||||||
|
public static final Random CLIENT_LIFETIME = new Random(30 * 1000, 2 * 60 * 1000);
|
||||||
|
public static final Random CLIENT_ONLINE = new Random(30 * 1000, 40 * 1000);
|
||||||
|
public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 10 * 1000);
|
||||||
|
|
||||||
|
public static final int CLIENT_OFFLINE_DURING_COMMIT = 2; // random(x) == x
|
||||||
|
|
||||||
|
public static final Persistence PERSISTENT_ADAPTER = Persistence.KAHADB;
|
||||||
|
|
||||||
|
public static final long BROKER_RESTART = -2 * 60 * 1000;
|
||||||
|
|
||||||
|
public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
|
||||||
|
public static final boolean CHECK_REDELIVERY = true;
|
||||||
|
|
||||||
|
private BrokerService broker;
|
||||||
|
private ActiveMQTopic topic;
|
||||||
|
|
||||||
|
private ClientManager clientManager;
|
||||||
|
private Server server;
|
||||||
|
private HouseKeeper houseKeeper;
|
||||||
|
|
||||||
|
private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(
|
||||||
|
true);
|
||||||
|
private int restartCount = 0;
|
||||||
|
private final AtomicInteger onlineCount = new AtomicInteger(0);
|
||||||
|
static final Vector<Throwable> exceptions = new Vector<Throwable>();
|
||||||
|
|
||||||
|
// long form of test that found https://issues.apache.org/jira/browse/AMQ-3805
|
||||||
|
@Ignore ("short version in org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testNoDuplicateOnConcurrentSendTranCommitAndActivate")
|
||||||
|
@Test
|
||||||
|
public void testProcess() {
|
||||||
|
try {
|
||||||
|
server.start();
|
||||||
|
clientManager.start();
|
||||||
|
|
||||||
|
if (ALLOW_SUBSCRIPTION_ABANDONMENT)
|
||||||
|
houseKeeper.start();
|
||||||
|
|
||||||
|
if (BROKER_RESTART <= 0)
|
||||||
|
Thread.sleep(RUNTIME);
|
||||||
|
else {
|
||||||
|
long end = System.currentTimeMillis() + RUNTIME;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (now > end)
|
||||||
|
break;
|
||||||
|
|
||||||
|
now = end - now;
|
||||||
|
now = now < BROKER_RESTART ? now : BROKER_RESTART;
|
||||||
|
Thread.sleep(now);
|
||||||
|
|
||||||
|
restartBroker();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
exit("ProcessTest.testProcess failed.", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
//allow the clients to unsubscribe before finishing
|
||||||
|
clientManager.setEnd(true);
|
||||||
|
try {
|
||||||
|
Thread.sleep(600000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
exit("ProcessTest.testProcess failed.", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
processLock.writeLock().lock();
|
||||||
|
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
|
||||||
|
LOG.info("DONE.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void restartBroker() throws Exception {
|
||||||
|
LOG.info("Broker restart: waiting for components.");
|
||||||
|
|
||||||
|
processLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
destroyBroker();
|
||||||
|
startBroker(false);
|
||||||
|
|
||||||
|
restartCount++;
|
||||||
|
LOG.info("Broker restarted. count: " + restartCount);
|
||||||
|
} finally {
|
||||||
|
processLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates batch of messages in a transaction periodically. The last message
|
||||||
|
* in the transaction is always a special message what contains info about
|
||||||
|
* the whole transaction.
|
||||||
|
* <p>
|
||||||
|
* Notifies the clients about the created messages also.
|
||||||
|
*/
|
||||||
|
final class Server extends Thread {
|
||||||
|
|
||||||
|
final String url = "vm://"
|
||||||
|
+ DurableSubProcessConcurrentCommitActivateNoDuplicateTest.getName()
|
||||||
|
+ "?"
|
||||||
|
+ "jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&"
|
||||||
|
+ "jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&"
|
||||||
|
+ "jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=false&"
|
||||||
|
+ "jms.alwaysSyncSend=true&jms.dispatchAsync=false&"
|
||||||
|
+ "jms.watchTopicAdvisories=false&"
|
||||||
|
+ "waitForStart=200&create=false";
|
||||||
|
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
|
||||||
|
|
||||||
|
final Object sendMutex = new Object();
|
||||||
|
final String[] cargos = new String[500];
|
||||||
|
|
||||||
|
int transRover = 0;
|
||||||
|
int messageRover = 0;
|
||||||
|
public volatile int committingTransaction = -1;
|
||||||
|
|
||||||
|
public Server() {
|
||||||
|
super("Server");
|
||||||
|
setPriority(Thread.MIN_PRIORITY);
|
||||||
|
setDaemon(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
processLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
send();
|
||||||
|
} finally {
|
||||||
|
processLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
exit("Server.run failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void send() throws JMSException {
|
||||||
|
// do not create new clients now
|
||||||
|
// ToDo: Test this case later.
|
||||||
|
synchronized (sendMutex) {
|
||||||
|
int trans = ++transRover;
|
||||||
|
boolean relevantTrans = true; //random(2) > 1;
|
||||||
|
ClientType clientType = relevantTrans ? ClientType
|
||||||
|
.randomClientType() : null; // sends this types
|
||||||
|
//int count = random(500, 700);
|
||||||
|
int count = 1000;
|
||||||
|
|
||||||
|
LOG.info("Sending Trans[id=" + trans + ", count="
|
||||||
|
+ count + ", clientType=" + clientType + ", firstID=" + (messageRover+1) + "]");
|
||||||
|
|
||||||
|
Connection con = cf.createConnection();
|
||||||
|
Session sess = con
|
||||||
|
.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
MessageProducer prod = sess.createProducer(null);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
Message message = sess.createMessage();
|
||||||
|
message.setIntProperty("ID", ++messageRover);
|
||||||
|
message.setIntProperty("TRANS", trans);
|
||||||
|
String type = clientType != null ? clientType
|
||||||
|
.randomMessageType() : ClientType
|
||||||
|
.randomNonRelevantMessageType();
|
||||||
|
message.setStringProperty("TYPE", type);
|
||||||
|
|
||||||
|
if (CARGO_SIZE > 0)
|
||||||
|
message.setStringProperty("CARGO",
|
||||||
|
getCargo(random(CARGO_SIZE)));
|
||||||
|
|
||||||
|
prod.send(topic, message);
|
||||||
|
clientManager.onServerMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Message message = sess.createMessage();
|
||||||
|
message.setIntProperty("ID", ++messageRover);
|
||||||
|
message.setIntProperty("TRANS", trans);
|
||||||
|
message.setBooleanProperty("COMMIT", true);
|
||||||
|
message.setBooleanProperty("RELEVANT", relevantTrans);
|
||||||
|
prod.send(topic, message);
|
||||||
|
clientManager.onServerMessage(message);
|
||||||
|
|
||||||
|
committingTransaction = trans;
|
||||||
|
sess.commit();
|
||||||
|
committingTransaction = -1;
|
||||||
|
|
||||||
|
LOG.info("Committed Trans[id=" + trans + ", count="
|
||||||
|
+ count + ", clientType=" + clientType + "], ID=" + messageRover);
|
||||||
|
|
||||||
|
sess.close();
|
||||||
|
con.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getCargo(int length) {
|
||||||
|
if (length == 0)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
if (length < cargos.length) {
|
||||||
|
String result = cargos[length];
|
||||||
|
if (result == null) {
|
||||||
|
result = getCargoImpl(length);
|
||||||
|
cargos[length] = result;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
return getCargoImpl(length);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getCargoImpl(int length) {
|
||||||
|
StringBuilder sb = new StringBuilder(length);
|
||||||
|
for (int i = length; --i >= 0;) {
|
||||||
|
sb.append('a');
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clients listen on different messages in the topic. The 'TYPE' property
|
||||||
|
* helps the client to select the proper messages.
|
||||||
|
*/
|
||||||
|
private enum ClientType {
|
||||||
|
A("a", "b", "c"), B("c", "d", "e"), C("d", "e", "f"), D("g", "h");
|
||||||
|
|
||||||
|
public final String[] messageTypes;
|
||||||
|
public final HashSet<String> messageTypeSet;
|
||||||
|
public final String selector;
|
||||||
|
|
||||||
|
ClientType(String... messageTypes) {
|
||||||
|
this.messageTypes = messageTypes;
|
||||||
|
messageTypeSet = new HashSet<String>(Arrays.asList(messageTypes));
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder("TYPE in (");
|
||||||
|
for (int i = 0; i < messageTypes.length; i++) {
|
||||||
|
if (i > 0)
|
||||||
|
sb.append(", ");
|
||||||
|
sb.append('\'').append(messageTypes[i]).append('\'');
|
||||||
|
}
|
||||||
|
sb.append(')');
|
||||||
|
selector = sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ClientType randomClientType() {
|
||||||
|
return values()[DurableSubProcessConcurrentCommitActivateNoDuplicateTest
|
||||||
|
.random(values().length - 1)];
|
||||||
|
}
|
||||||
|
|
||||||
|
public final String randomMessageType() {
|
||||||
|
return messageTypes[DurableSubProcessConcurrentCommitActivateNoDuplicateTest
|
||||||
|
.random(messageTypes.length - 1)];
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String randomNonRelevantMessageType() {
|
||||||
|
return Integer
|
||||||
|
.toString(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.random(20));
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean isRelevant(String messageType) {
|
||||||
|
return messageTypeSet.contains(messageType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final String toString() {
|
||||||
|
return this.name() /* + '[' + selector + ']' */;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates new cliens.
|
||||||
|
*/
|
||||||
|
private final class ClientManager extends Thread {
|
||||||
|
|
||||||
|
private int clientRover = 0;
|
||||||
|
|
||||||
|
private final CopyOnWriteArrayList<Client> clients = new CopyOnWriteArrayList<Client>();
|
||||||
|
|
||||||
|
private boolean end;
|
||||||
|
|
||||||
|
public ClientManager() {
|
||||||
|
super("ClientManager");
|
||||||
|
setDaemon(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void setEnd(boolean end) {
|
||||||
|
this.end = end;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
if (clients.size() < MAX_CLIENTS && !end) {
|
||||||
|
processLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
createNewClient();
|
||||||
|
} finally {
|
||||||
|
processLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int size = clients.size();
|
||||||
|
//sleepRandom(1000, 4000);
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
exit("ClientManager.run failed.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createNewClient() throws JMSException {
|
||||||
|
ClientType type = ClientType.randomClientType();
|
||||||
|
|
||||||
|
Client client;
|
||||||
|
synchronized (server.sendMutex) {
|
||||||
|
client = new Client(++clientRover, type, CLIENT_LIFETIME,
|
||||||
|
CLIENT_ONLINE, CLIENT_OFFLINE);
|
||||||
|
clients.add(client);
|
||||||
|
}
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
LOG.info(client.toString() + " created. " + this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeClient(Client client) {
|
||||||
|
clients.remove(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onServerMessage(Message message) throws JMSException {
|
||||||
|
for (Client client : clients) {
|
||||||
|
client.onServerMessage(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = new StringBuilder("ClientManager[count=");
|
||||||
|
sb.append(clients.size());
|
||||||
|
sb.append(", clients=");
|
||||||
|
boolean sep = false;
|
||||||
|
for (Client client : clients) {
|
||||||
|
if (sep)
|
||||||
|
sb.append(", ");
|
||||||
|
else
|
||||||
|
sep = true;
|
||||||
|
sb.append(client.toString());
|
||||||
|
}
|
||||||
|
sb.append(']');
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consumes massages from a durable subscription. Goes online/offline
|
||||||
|
* periodically. Checks the incoming messages against the sent messages of
|
||||||
|
* the server.
|
||||||
|
*/
|
||||||
|
private final class Client extends Thread {
|
||||||
|
|
||||||
|
String url = "failover:(tcp://localhost:61656?wireFormat.maxInactivityDuration=0)?"
|
||||||
|
+ "jms.watchTopicAdvisories=false&"
|
||||||
|
+ "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
|
||||||
|
+ "jms.producerWindowSize=20971520&"
|
||||||
|
+ "jms.copyMessageOnSend=false&"
|
||||||
|
+ "jms.sendAcksAsync=false&"
|
||||||
|
+ "initialReconnectDelay=100&maxReconnectDelay=30000&"
|
||||||
|
+ "useExponentialBackOff=true";
|
||||||
|
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
|
||||||
|
|
||||||
|
public static final String SUBSCRIPTION_NAME = "subscription";
|
||||||
|
|
||||||
|
private final int id;
|
||||||
|
private final String conClientId;
|
||||||
|
|
||||||
|
private final Random lifetime;
|
||||||
|
private final Random online;
|
||||||
|
private final Random offline;
|
||||||
|
|
||||||
|
private final ClientType clientType;
|
||||||
|
private final String selector;
|
||||||
|
|
||||||
|
private final ConcurrentLinkedQueue<Message> waitingList = new ConcurrentLinkedQueue<Message>();
|
||||||
|
private final HashSet<Integer> processed = CHECK_REDELIVERY ? new HashSet<Integer>(
|
||||||
|
10000) : null;
|
||||||
|
|
||||||
|
private ActiveMQMessageConsumer consumer = null;
|
||||||
|
|
||||||
|
public Client(int id, ClientType clientType, Random lifetime,
|
||||||
|
Random online, Random offline) throws JMSException {
|
||||||
|
super("Client" + id);
|
||||||
|
setDaemon(true);
|
||||||
|
|
||||||
|
this.id = id;
|
||||||
|
conClientId = "cli" + id;
|
||||||
|
this.clientType = clientType;
|
||||||
|
selector = "(COMMIT = true and RELEVANT = true) or "
|
||||||
|
+ clientType.selector;
|
||||||
|
|
||||||
|
this.lifetime = lifetime;
|
||||||
|
this.online = online;
|
||||||
|
this.offline = offline;
|
||||||
|
|
||||||
|
subscribe();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
long end = System.currentTimeMillis() + 60000;
|
||||||
|
try {
|
||||||
|
boolean sleep = false;
|
||||||
|
while (true) {
|
||||||
|
long max = end - System.currentTimeMillis();
|
||||||
|
if (max <= 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
/*
|
||||||
|
if (sleep)
|
||||||
|
offline.sleepRandom();
|
||||||
|
else
|
||||||
|
sleep = true;
|
||||||
|
*/
|
||||||
|
|
||||||
|
Thread.sleep(100);
|
||||||
|
|
||||||
|
processLock.readLock().lock();
|
||||||
|
onlineCount.incrementAndGet();
|
||||||
|
try {
|
||||||
|
process(online.next());
|
||||||
|
} finally {
|
||||||
|
onlineCount.decrementAndGet();
|
||||||
|
processLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0)
|
||||||
|
unsubscribe();
|
||||||
|
else {
|
||||||
|
LOG.info("Client abandon the subscription. "
|
||||||
|
+ this);
|
||||||
|
|
||||||
|
// housekeeper should sweep these abandoned subscriptions
|
||||||
|
houseKeeper.abandonedSubscriptions.add(conClientId);
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
exit(toString() + " failed.", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
clientManager.removeClient(this);
|
||||||
|
LOG.info(toString() + " DONE.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void process(long millis) throws JMSException {
|
||||||
|
//long end = System.currentTimeMillis() + millis;
|
||||||
|
long end = System.currentTimeMillis() + 200;
|
||||||
|
long hardEnd = end + 20000; // wait to finish the transaction.
|
||||||
|
boolean inTransaction = false;
|
||||||
|
int transCount = 0;
|
||||||
|
|
||||||
|
Connection con = openConnection();
|
||||||
|
Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
consumer = (ActiveMQMessageConsumer) sess.createDurableSubscriber(topic,
|
||||||
|
SUBSCRIPTION_NAME, selector, false);
|
||||||
|
LOG.info(toString() + " ONLINE.");
|
||||||
|
try {
|
||||||
|
do {
|
||||||
|
long max = end - System.currentTimeMillis();
|
||||||
|
if (max <= 0) {
|
||||||
|
if (!inTransaction) {
|
||||||
|
LOG.info(toString() + " done after no work!");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
max = hardEnd - System.currentTimeMillis();
|
||||||
|
if (max <= 0)
|
||||||
|
exit("" + this
|
||||||
|
+ " failed: Transaction is not finished.");
|
||||||
|
}
|
||||||
|
|
||||||
|
Message message = consumer.receive(max);
|
||||||
|
if (message == null)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
onClientMessage(message);
|
||||||
|
|
||||||
|
if (message.propertyExists("COMMIT")) {
|
||||||
|
message.acknowledge(); // CLIENT_ACKNOWLEDGE
|
||||||
|
|
||||||
|
int trans = message.getIntProperty("TRANS");
|
||||||
|
LOG.info("Received Trans[id="
|
||||||
|
+ trans + ", count="
|
||||||
|
+ transCount + "] in " + this + ".");
|
||||||
|
|
||||||
|
inTransaction = false;
|
||||||
|
transCount = 0;
|
||||||
|
|
||||||
|
int committing = server.committingTransaction;
|
||||||
|
if (committing == trans) {
|
||||||
|
LOG.info("Going offline during transaction commit. messageID=" + message.getIntProperty("ID"));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
inTransaction = true;
|
||||||
|
transCount++;
|
||||||
|
if (1 == transCount) {
|
||||||
|
LOG.info("In Trans[id=" + message.getIntProperty("TRANS") + "] first ID=" + message.getIntProperty("ID"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (true);
|
||||||
|
} finally {
|
||||||
|
sess.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
LOG.info(toString() + " OFFLINE.");
|
||||||
|
|
||||||
|
// Check if the messages are in the waiting
|
||||||
|
// list for long time.
|
||||||
|
Message topMessage = waitingList.peek();
|
||||||
|
if (topMessage != null)
|
||||||
|
checkDeliveryTime(topMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onServerMessage(Message message) throws JMSException {
|
||||||
|
if (Boolean.TRUE.equals(message.getObjectProperty("COMMIT"))) {
|
||||||
|
if (Boolean.TRUE.equals(message.getObjectProperty("RELEVANT")))
|
||||||
|
waitingList.add(message);
|
||||||
|
} else {
|
||||||
|
String messageType = message.getStringProperty("TYPE");
|
||||||
|
if (clientType.isRelevant(messageType))
|
||||||
|
waitingList.add(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onClientMessage(Message message) {
|
||||||
|
Message serverMessage = waitingList.poll();
|
||||||
|
try {
|
||||||
|
Integer receivedId = (Integer) message.getObjectProperty("ID");
|
||||||
|
if (processed != null && processed.contains(receivedId))
|
||||||
|
LOG.info("! Message has been processed before. "
|
||||||
|
+ this + " redeliveredFlag=" + message.getJMSRedelivered() + ", message = " + message);
|
||||||
|
|
||||||
|
if (serverMessage == null)
|
||||||
|
exit(""
|
||||||
|
+ this
|
||||||
|
+ " failed: There is no next server message, but received: "
|
||||||
|
+ message);
|
||||||
|
|
||||||
|
Integer serverId = (Integer) serverMessage
|
||||||
|
.getObjectProperty("ID");
|
||||||
|
if (receivedId == null || serverId == null)
|
||||||
|
exit("" + this + " failed: message ID not found.\r\n"
|
||||||
|
+ " received: " + message + "\r\n" + " server: "
|
||||||
|
+ serverMessage);
|
||||||
|
|
||||||
|
if (!serverId.equals(receivedId)) {
|
||||||
|
StringBuilder missingList = new StringBuilder();
|
||||||
|
Object lastTrans = null;
|
||||||
|
int transCount = 0;
|
||||||
|
Message nextServerMessage = serverMessage;
|
||||||
|
do {
|
||||||
|
Integer nextServerId = (Integer) nextServerMessage.getObjectProperty("ID");
|
||||||
|
if (nextServerId.equals(receivedId)) {
|
||||||
|
if (lastTrans != null)
|
||||||
|
missingList.append("Missing TRANS=").append(lastTrans).append(", size=").append(transCount).append("\r\n");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
Object trans = nextServerMessage.getObjectProperty("TRANS");
|
||||||
|
if (!trans.equals(lastTrans)) {
|
||||||
|
if (lastTrans != null)
|
||||||
|
missingList.append("Missing TRANS=").append(lastTrans).append(", size=").append(transCount).append("\r\n");
|
||||||
|
lastTrans = trans;
|
||||||
|
transCount = 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
transCount++;
|
||||||
|
} while ((nextServerMessage = waitingList.poll()) != null);
|
||||||
|
|
||||||
|
exit("Missing messages!\r\n" + missingList +
|
||||||
|
"Received message: " + message + "\r\n" +
|
||||||
|
"Expected message: " + serverMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
checkDeliveryTime(message);
|
||||||
|
|
||||||
|
if (processed != null)
|
||||||
|
processed.add(receivedId);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
exit("" + this + ".onClientMessage failed.\r\n" + " received: "
|
||||||
|
+ message + "\r\n" + " server: " + serverMessage, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the message was not delivered fast enough.
|
||||||
|
*/
|
||||||
|
public void checkDeliveryTime(Message message) throws JMSException {
|
||||||
|
long creation = message.getJMSTimestamp();
|
||||||
|
long min = System.currentTimeMillis() - (offline.max + online.min)
|
||||||
|
* (BROKER_RESTART > 0 ? 4 : 1);
|
||||||
|
|
||||||
|
if (false && min > creation) {
|
||||||
|
SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
|
||||||
|
exit("" + this + ".checkDeliveryTime failed. Message time: "
|
||||||
|
+ df.format(new Date(creation)) + ", min: "
|
||||||
|
+ df.format(new Date(min)) + "\r\n" + message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Connection openConnection() throws JMSException {
|
||||||
|
Connection con = cf.createConnection();
|
||||||
|
con.setClientID(conClientId);
|
||||||
|
((ActiveMQConnection) con).setCloseTimeout(60000);
|
||||||
|
con.start();
|
||||||
|
return con;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void subscribe() throws JMSException {
|
||||||
|
processLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
Connection con = openConnection();
|
||||||
|
Session session = con
|
||||||
|
.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector,
|
||||||
|
true);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
processLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void unsubscribe() throws JMSException {
|
||||||
|
processLock.readLock().lock();
|
||||||
|
LOG.info("Unsubscribe: " + this);
|
||||||
|
try {
|
||||||
|
Connection con = openConnection();
|
||||||
|
Session session = con
|
||||||
|
.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session.unsubscribe(SUBSCRIPTION_NAME);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
processLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Client[id=" + id + ", type=" + clientType + "] consumerId=" + (consumer != null ? consumer.getConsumerId() : "null");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sweeps out not-used durable subscriptions.
|
||||||
|
*/
|
||||||
|
private final class HouseKeeper extends Thread {
|
||||||
|
|
||||||
|
private HouseKeeper() {
|
||||||
|
super("HouseKeeper");
|
||||||
|
setDaemon(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public final CopyOnWriteArrayList<String> abandonedSubscriptions = new CopyOnWriteArrayList<String>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(3 * 60 * 1000);
|
||||||
|
|
||||||
|
processLock.readLock().lock();
|
||||||
|
try {
|
||||||
|
sweep();
|
||||||
|
} finally {
|
||||||
|
processLock.readLock().unlock();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
break;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
Exception log = new Exception("HouseKeeper failed.", e);
|
||||||
|
log.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sweep() throws Exception {
|
||||||
|
LOG.info("Housekeeper sweeping.");
|
||||||
|
|
||||||
|
int closed = 0;
|
||||||
|
ArrayList<String> sweeped = new ArrayList<String>();
|
||||||
|
try {
|
||||||
|
for (String clientId : abandonedSubscriptions) {
|
||||||
|
LOG.info("Sweeping out subscription of "
|
||||||
|
+ clientId + ".");
|
||||||
|
broker.getAdminView().destroyDurableSubscriber(clientId,
|
||||||
|
Client.SUBSCRIPTION_NAME);
|
||||||
|
sweeped.add(clientId);
|
||||||
|
closed++;
|
||||||
|
}
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
LOG.info("Ex on destroy sub " + ignored);
|
||||||
|
} finally {
|
||||||
|
abandonedSubscriptions.removeAll(sweeped);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Housekeeper sweeped out " + closed
|
||||||
|
+ " subscriptions.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int random(int max) {
|
||||||
|
return (int) (Math.random() * (max + 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int random(int min, int max) {
|
||||||
|
return random(max - min) + min;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void sleepRandom(int maxMillis) throws InterruptedException {
|
||||||
|
Thread.sleep(random(maxMillis));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void sleepRandom(int minMillis, int maxMillis)
|
||||||
|
throws InterruptedException {
|
||||||
|
Thread.sleep(random(minMillis, maxMillis));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class Random {
|
||||||
|
|
||||||
|
final int min;
|
||||||
|
final int max;
|
||||||
|
|
||||||
|
Random(int min, int max) {
|
||||||
|
this.min = min;
|
||||||
|
this.max = max;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int next() {
|
||||||
|
return random(min, max);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sleepRandom() throws InterruptedException {
|
||||||
|
DurableSubProcessConcurrentCommitActivateNoDuplicateTest.sleepRandom(min, max);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void exit(String message) {
|
||||||
|
exit(message, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void exit(String message, Throwable e) {
|
||||||
|
Throwable cause = new RuntimeException(message, e);
|
||||||
|
LOG.error(message, cause);
|
||||||
|
exceptions.add(cause);
|
||||||
|
ThreadTracker.result();
|
||||||
|
//fail(cause.toString());
|
||||||
|
System.exit(-9);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
topic = new ActiveMQTopic("TopicT");
|
||||||
|
startBroker();
|
||||||
|
|
||||||
|
clientManager = new ClientManager();
|
||||||
|
server = new Server();
|
||||||
|
houseKeeper = new HouseKeeper();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
destroyBroker();
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum Persistence {
|
||||||
|
MEMORY, AMQ, KAHA, KAHADB
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startBroker() throws Exception {
|
||||||
|
startBroker(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startBroker(boolean deleteAllMessages) throws Exception {
|
||||||
|
if (broker != null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
|
||||||
|
broker.setBrokerName(getName());
|
||||||
|
broker.setAdvisorySupport(false);
|
||||||
|
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||||
|
|
||||||
|
switch (PERSISTENT_ADAPTER) {
|
||||||
|
case MEMORY:
|
||||||
|
broker.setPersistent(false);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case AMQ:
|
||||||
|
File amqData = new File("activemq-data/" + getName() + "-amq");
|
||||||
|
if (deleteAllMessages)
|
||||||
|
delete(amqData);
|
||||||
|
|
||||||
|
broker.setPersistent(true);
|
||||||
|
AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
|
||||||
|
amq.setDirectory(amqData);
|
||||||
|
broker.setPersistenceAdapter(amq);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case KAHA:
|
||||||
|
File kahaData = new File("activemq-data/" + getName() + "-kaha");
|
||||||
|
if (deleteAllMessages)
|
||||||
|
delete(kahaData);
|
||||||
|
|
||||||
|
broker.setPersistent(true);
|
||||||
|
KahaPersistenceAdapter kaha = new KahaPersistenceAdapter();
|
||||||
|
kaha.setDirectory(kahaData);
|
||||||
|
broker.setPersistenceAdapter(kaha);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case KAHADB:
|
||||||
|
File kahadbData = new File("activemq-data/" + getName() + "-kahadb");
|
||||||
|
if (deleteAllMessages)
|
||||||
|
delete(kahadbData);
|
||||||
|
|
||||||
|
broker.setPersistent(true);
|
||||||
|
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
|
||||||
|
kahadb.setDirectory(kahadbData);
|
||||||
|
kahadb.setJournalMaxFileLength(5 * 1024 * 1024);
|
||||||
|
broker.setPersistenceAdapter(kahadb);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
broker.addConnector("tcp://localhost:61656");
|
||||||
|
|
||||||
|
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
|
||||||
|
broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024);
|
||||||
|
broker.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 1024);
|
||||||
|
|
||||||
|
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
PolicyEntry defaultEntry = new PolicyEntry();
|
||||||
|
defaultEntry.setMaxAuditDepth(20000);
|
||||||
|
policyMap.setDefaultEntry(defaultEntry);
|
||||||
|
broker.setDestinationPolicy(policyMap);
|
||||||
|
broker.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String getName() {
|
||||||
|
return "DurableSubProcessWithRestartTest";
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean delete(File path) {
|
||||||
|
if (path == null)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (path.isDirectory()) {
|
||||||
|
for (File file : path.listFiles()) {
|
||||||
|
delete(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return path.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void destroyBroker() throws Exception {
|
||||||
|
if (broker == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
broker.stop();
|
||||||
|
broker = null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -214,7 +214,7 @@ public class DurableSubscriberWithNetworkDisconnectTest extends JmsMultipleBroke
|
||||||
}
|
}
|
||||||
|
|
||||||
if (failover) {
|
if (failover) {
|
||||||
options += "?maxReconnectAttempts=1)";
|
options += "?maxReconnectAttempts=0)";
|
||||||
}
|
}
|
||||||
|
|
||||||
options += "?useExponentialBackOff=" + exponentialBackOff;
|
options += "?useExponentialBackOff=" + exponentialBackOff;
|
||||||
|
|
|
@ -16,7 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.usecases;
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Vector;
|
import java.util.Vector;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
|
@ -33,9 +38,8 @@ import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
|
||||||
import org.apache.activemq.broker.jmx.TopicViewMBean;
|
import org.apache.activemq.broker.jmx.TopicViewMBean;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
|
@ -51,7 +55,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
||||||
public boolean keepDurableSubsActive = true;
|
public boolean keepDurableSubsActive = true;
|
||||||
private BrokerService broker;
|
private BrokerService broker;
|
||||||
private ActiveMQTopic topic;
|
private ActiveMQTopic topic;
|
||||||
private Vector<Exception> exceptions = new Vector<Exception>();
|
private Vector<Throwable> exceptions = new Vector<Throwable>();
|
||||||
|
|
||||||
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
|
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
|
||||||
|
@ -962,6 +966,95 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
||||||
assertEquals("offline consumer got all", sent, listener.count);
|
assertEquals("offline consumer got all", sent, listener.count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
|
||||||
|
final int messageCount = 1000;
|
||||||
|
Connection con = null;
|
||||||
|
Session session = null;
|
||||||
|
final int numConsumers = 10;
|
||||||
|
for (int i = 0; i <= numConsumers; i++) {
|
||||||
|
con = createConnection("cli" + i);
|
||||||
|
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session.createDurableSubscriber(topic, "SubsId", null, true);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
class CheckForDupsClient implements Runnable {
|
||||||
|
HashSet<Long> ids = new HashSet<Long>();
|
||||||
|
final int id;
|
||||||
|
|
||||||
|
public CheckForDupsClient(int id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
Connection con = createConnection("cli" + id);
|
||||||
|
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
for (int j=0;j<2;j++) {
|
||||||
|
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
|
||||||
|
for (int i = 0; i < messageCount/2; i++) {
|
||||||
|
Message message = consumer.receive(4000);
|
||||||
|
assertNotNull(message);
|
||||||
|
long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
|
||||||
|
assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
|
||||||
|
}
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify no duplicates left
|
||||||
|
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
|
||||||
|
Message message = consumer.receive(4000);
|
||||||
|
if (message != null) {
|
||||||
|
long producerSequenceId = new MessageId(message.getJMSMessageID()).getProducerSequenceId();
|
||||||
|
assertTrue("ID=" + id + " not a duplicate: " + producerSequenceId, ids.add(producerSequenceId));
|
||||||
|
}
|
||||||
|
assertNull(message);
|
||||||
|
|
||||||
|
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
final String payLoad = new String(new byte[1000]);
|
||||||
|
con = createConnection();
|
||||||
|
final Session sendSession = con.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
MessageProducer producer = sendSession.createProducer(topic);
|
||||||
|
for (int i = 0; i < messageCount; i++) {
|
||||||
|
producer.send(sendSession.createTextMessage(payLoad));
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||||
|
|
||||||
|
// concurrent commit and activate
|
||||||
|
executorService.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
sendSession.commit();
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
for (int i = 0; i < numConsumers; i++) {
|
||||||
|
executorService.execute(new CheckForDupsClient(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
executorService.shutdown();
|
||||||
|
executorService.awaitTermination(5, TimeUnit.MINUTES);
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
|
public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
|
||||||
// create offline subs 1
|
// create offline subs 1
|
||||||
|
@ -1292,6 +1385,120 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testRedeliveryFlag() throws Exception {
|
||||||
|
|
||||||
|
Connection con;
|
||||||
|
Session session;
|
||||||
|
final int numClients = 2;
|
||||||
|
for (int i=0; i<numClients; i++) {
|
||||||
|
con = createConnection("cliId" + i);
|
||||||
|
session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
final Random random = new Random();
|
||||||
|
|
||||||
|
// send messages
|
||||||
|
con = createConnection();
|
||||||
|
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(null);
|
||||||
|
|
||||||
|
final int count = 1000;
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
Message message = session.createMessage();
|
||||||
|
message.setStringProperty("filter", "true");
|
||||||
|
producer.send(topic, message);
|
||||||
|
}
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
class Client implements Runnable {
|
||||||
|
Connection con;
|
||||||
|
Session session;
|
||||||
|
String clientId;
|
||||||
|
Client(String id) {
|
||||||
|
this.clientId = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
MessageConsumer consumer = null;
|
||||||
|
Message message = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (int i = -1; i < random.nextInt(10); i++) {
|
||||||
|
// go online and take none
|
||||||
|
con = createConnection(clientId);
|
||||||
|
session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// consume 1
|
||||||
|
con = createConnection(clientId);
|
||||||
|
session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||||
|
message = consumer.receive(4000);
|
||||||
|
assertNotNull("got message", message);
|
||||||
|
// it is not reliable as it depends on broker dispatch rather than client receipt
|
||||||
|
// and delivered ack
|
||||||
|
// assertFalse("not redelivered", message.getJMSRedelivered());
|
||||||
|
message.acknowledge();
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
// peek all
|
||||||
|
for (int j = 0; j < random.nextInt(10); j++) {
|
||||||
|
con = createConnection(clientId);
|
||||||
|
session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||||
|
|
||||||
|
for (int i = 0; i < count - 1; i++) {
|
||||||
|
assertNotNull("got message", consumer.receive(4000));
|
||||||
|
}
|
||||||
|
// no ack
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// consume remaining
|
||||||
|
con = createConnection(clientId);
|
||||||
|
session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||||
|
|
||||||
|
for (int i = 0; i < count - 1; i++) {
|
||||||
|
message = consumer.receive(4000);
|
||||||
|
assertNotNull("got message", message);
|
||||||
|
assertTrue("is redelivered", message.getJMSRedelivered());
|
||||||
|
}
|
||||||
|
message.acknowledge();
|
||||||
|
session.close();
|
||||||
|
con.close();
|
||||||
|
|
||||||
|
con = createConnection(clientId);
|
||||||
|
session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||||
|
assertNull("no message left", consumer.receive(2000));
|
||||||
|
} catch (Throwable throwable) {
|
||||||
|
throwable.printStackTrace();
|
||||||
|
exceptions.add(throwable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||||
|
for (int i=0; i<numClients; i++) {
|
||||||
|
executorService.execute(new Client("cliId" + i));
|
||||||
|
}
|
||||||
|
executorService.shutdown();
|
||||||
|
executorService.awaitTermination(10, TimeUnit.MINUTES);
|
||||||
|
assertTrue("No exceptions", exceptions.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
public static class Listener implements MessageListener {
|
public static class Listener implements MessageListener {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
String id = null;
|
String id = null;
|
||||||
|
|
|
@ -32,12 +32,15 @@ import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.kahadb.page.PageFile;
|
import org.apache.kahadb.page.PageFile;
|
||||||
|
import org.apache.kahadb.page.Transaction;
|
||||||
import org.apache.kahadb.util.LongMarshaller;
|
import org.apache.kahadb.util.LongMarshaller;
|
||||||
import org.apache.kahadb.util.StringMarshaller;
|
import org.apache.kahadb.util.StringMarshaller;
|
||||||
import org.apache.kahadb.util.VariableMarshaller;
|
import org.apache.kahadb.util.VariableMarshaller;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class BTreeIndexTest extends IndexTestSupport {
|
public class BTreeIndexTest extends IndexTestSupport {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(BTreeIndexTest.class);
|
||||||
private NumberFormat nf;
|
private NumberFormat nf;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -127,7 +130,7 @@ public class BTreeIndexTest extends IndexTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testIteration() throws Exception {
|
public void testIteration() throws Exception {
|
||||||
createPageFileAndIndex(100);
|
createPageFileAndIndex(500);
|
||||||
BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
|
BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
|
||||||
this.index.load(tx);
|
this.index.load(tx);
|
||||||
tx.commit();
|
tx.commit();
|
||||||
|
@ -140,6 +143,8 @@ public class BTreeIndexTest extends IndexTestSupport {
|
||||||
this.index.load(tx);
|
this.index.load(tx);
|
||||||
tx.commit();
|
tx.commit();
|
||||||
|
|
||||||
|
exerciseAnotherIndex(tx);
|
||||||
|
|
||||||
// BTree should iterate it in sorted order.
|
// BTree should iterate it in sorted order.
|
||||||
int counter=0;
|
int counter=0;
|
||||||
for (Iterator<Map.Entry<String,Long>> i = index.iterator(tx); i.hasNext();) {
|
for (Iterator<Map.Entry<String,Long>> i = index.iterator(tx); i.hasNext();) {
|
||||||
|
@ -189,18 +194,77 @@ public class BTreeIndexTest extends IndexTestSupport {
|
||||||
createPageFileAndIndex(100);
|
createPageFileAndIndex(100);
|
||||||
BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
|
BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
|
||||||
this.index.load(tx);
|
this.index.load(tx);
|
||||||
|
|
||||||
|
long id = tx.allocate().getPageId();
|
||||||
tx.commit();
|
tx.commit();
|
||||||
|
|
||||||
final int count = 4000;
|
BTreeIndex<String, String> sindex = new BTreeIndex<String,String>(pf, id);
|
||||||
doInsert(count);
|
sindex.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
sindex.setValueMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
sindex.load(tx);
|
||||||
|
|
||||||
|
tx.commit();
|
||||||
|
|
||||||
|
final int count = 5000;
|
||||||
|
|
||||||
|
String payload = new String(new byte[2]);
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
index.put(tx, key(i), (long)i);
|
||||||
|
sindex.put(tx, key(i), String.valueOf(i) + payload);
|
||||||
|
tx.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
Random rand = new Random(System.currentTimeMillis());
|
Random rand = new Random(System.currentTimeMillis());
|
||||||
int i = 0, prev = 0;
|
int i = 0, prev = 0;
|
||||||
while (!index.isEmpty(tx)) {
|
while (!index.isEmpty(tx) || !sindex.isEmpty(tx)) {
|
||||||
prev = i;
|
prev = i;
|
||||||
i = rand.nextInt(count);
|
i = rand.nextInt(count);
|
||||||
try {
|
try {
|
||||||
index.remove(tx, key(i));
|
index.remove(tx, key(i));
|
||||||
|
sindex.remove(tx, key(i));
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRandomAddRemove() throws Exception {
|
||||||
|
|
||||||
|
createPageFileAndIndex(1024);
|
||||||
|
BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
|
||||||
|
this.index.load(tx);
|
||||||
|
|
||||||
|
long id = tx.allocate().getPageId();
|
||||||
|
tx.commit();
|
||||||
|
|
||||||
|
BTreeIndex<String, String> sindex = new BTreeIndex<String,String>(pf, id);
|
||||||
|
sindex.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
sindex.setValueMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
sindex.load(tx);
|
||||||
|
|
||||||
|
tx.commit();
|
||||||
|
|
||||||
|
Random rand = new Random(System.currentTimeMillis());
|
||||||
|
final int count = 50000;
|
||||||
|
|
||||||
|
String payload = new String(new byte[200]);
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
int insertIndex = rand.nextInt(count);
|
||||||
|
index.put(tx, key(insertIndex), (long)insertIndex);
|
||||||
|
sindex.put(tx, key(insertIndex), String.valueOf(insertIndex) + payload);
|
||||||
|
tx.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int i = 0, prev = 0;
|
||||||
|
while (!index.isEmpty(tx) || !sindex.isEmpty(tx)) {
|
||||||
|
prev = i;
|
||||||
|
i = rand.nextInt(count);
|
||||||
|
try {
|
||||||
|
index.remove(tx, key(i));
|
||||||
|
sindex.remove(tx, key(i));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e);
|
fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e);
|
||||||
|
@ -219,13 +283,46 @@ public class BTreeIndexTest extends IndexTestSupport {
|
||||||
|
|
||||||
index.remove(tx, key(3697));
|
index.remove(tx, key(3697));
|
||||||
index.remove(tx, key(1566));
|
index.remove(tx, key(1566));
|
||||||
|
|
||||||
|
tx.commit();
|
||||||
|
index.clear(tx);
|
||||||
|
tx.commit();
|
||||||
|
|
||||||
|
doInsert(count);
|
||||||
|
|
||||||
|
Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx, key(1345));
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Map.Entry<String, Long> val = iterator.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
doRemoveBackwards(666);
|
||||||
|
Map.Entry<String, Long> first = index.getFirst(tx);
|
||||||
|
assertEquals(first.getValue(), Long.valueOf(666L));
|
||||||
|
|
||||||
|
for (int i=0; i<2000; i++) {
|
||||||
|
Map.Entry<String, Long> last = index.getLast(tx);
|
||||||
|
index.remove(tx, last.getKey());
|
||||||
|
tx.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
exerciseAnotherIndex(tx);
|
||||||
|
|
||||||
|
iterator = index.iterator(tx, key(100));
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Map.Entry<String, Long> val = iterator.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map.Entry<String, Long> last = index.getLast(tx);
|
||||||
|
assertEquals(last.getValue(), Long.valueOf(1999L));
|
||||||
|
index.clear(tx);
|
||||||
|
assertNull(index.getLast(tx));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testLargeValue() throws Exception {
|
public void testLargeValue() throws Exception {
|
||||||
//System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024));
|
//System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024));
|
||||||
pf = new PageFile(directory, getClass().getName());
|
pf = new PageFile(directory, getClass().getName());
|
||||||
pf.setPageSize(4*1024);
|
pf.setPageSize(4*1024);
|
||||||
pf.setEnablePageCaching(false);
|
//pf.setEnablePageCaching(false);
|
||||||
pf.load();
|
pf.load();
|
||||||
tx = pf.tx();
|
tx = pf.tx();
|
||||||
long id = tx.allocate().getPageId();
|
long id = tx.allocate().getPageId();
|
||||||
|
@ -262,6 +359,7 @@ public class BTreeIndexTest extends IndexTestSupport {
|
||||||
tx.commit();
|
tx.commit();
|
||||||
tx = pf.tx();
|
tx = pf.tx();
|
||||||
for (long i=0; i<numMessages; i++) {
|
for (long i=0; i<numMessages; i++) {
|
||||||
|
assertTrue(test.containsKey(tx, i));
|
||||||
test.get(tx, i);
|
test.get(tx, i);
|
||||||
}
|
}
|
||||||
tx.commit();
|
tx.commit();
|
||||||
|
@ -270,7 +368,6 @@ public class BTreeIndexTest extends IndexTestSupport {
|
||||||
public void testLargeValueOverflow() throws Exception {
|
public void testLargeValueOverflow() throws Exception {
|
||||||
pf = new PageFile(directory, getClass().getName());
|
pf = new PageFile(directory, getClass().getName());
|
||||||
pf.setPageSize(4*1024);
|
pf.setPageSize(4*1024);
|
||||||
pf.setEnablePageCaching(false);
|
|
||||||
pf.setWriteBatchSize(1);
|
pf.setWriteBatchSize(1);
|
||||||
pf.load();
|
pf.load();
|
||||||
tx = pf.tx();
|
tx = pf.tx();
|
||||||
|
@ -292,14 +389,106 @@ public class BTreeIndexTest extends IndexTestSupport {
|
||||||
}
|
}
|
||||||
tx.commit();
|
tx.commit();
|
||||||
|
|
||||||
|
exerciseAnotherIndex(tx);
|
||||||
|
|
||||||
tx = pf.tx();
|
tx = pf.tx();
|
||||||
for (long i=0; i<numMessages; i++) {
|
for (long i=0; i<numMessages; i++) {
|
||||||
|
assertTrue(test.containsKey(tx, i));
|
||||||
String s = test.get(tx, i);
|
String s = test.get(tx, i);
|
||||||
assertEquals("len is as expected", stringSize, s.length());
|
assertEquals("len is as expected", stringSize, s.length());
|
||||||
}
|
}
|
||||||
tx.commit();
|
tx.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void exerciseAnotherIndex(Transaction tx) throws Exception {
|
||||||
|
long id = tx.allocate().getPageId();
|
||||||
|
|
||||||
|
ListIndex<String, String> test = new ListIndex<String, String>(pf, id);
|
||||||
|
test.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
test.setValueMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
test.load(tx);
|
||||||
|
tx.commit();
|
||||||
|
|
||||||
|
final int count = 10000;
|
||||||
|
|
||||||
|
String payload = new String(new byte[1]);
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
test.put(tx, key(i), String.valueOf(i) + payload);
|
||||||
|
}
|
||||||
|
tx.commit();
|
||||||
|
|
||||||
|
test.clear(tx);
|
||||||
|
tx.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testListIndexConsistancyOverTime() throws Exception {
|
||||||
|
|
||||||
|
final int NUM_ITERATIONS = 50;
|
||||||
|
|
||||||
|
pf = new PageFile(directory, getClass().getName());
|
||||||
|
pf.setPageSize(4*1024);
|
||||||
|
//pf.setEnablePageCaching(false);
|
||||||
|
pf.setWriteBatchSize(1);
|
||||||
|
pf.load();
|
||||||
|
tx = pf.tx();
|
||||||
|
long id = tx.allocate().getPageId();
|
||||||
|
|
||||||
|
ListIndex<String, String> test = new ListIndex<String, String>(pf, id);
|
||||||
|
test.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
test.setValueMarshaller(StringMarshaller.INSTANCE);
|
||||||
|
test.load(tx);
|
||||||
|
tx.commit();
|
||||||
|
|
||||||
|
int expectedListEntries = 0;
|
||||||
|
int nextSequenceId = 0;
|
||||||
|
|
||||||
|
LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entires and sparsely populating the sequences.");
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_ITERATIONS; ++i) {
|
||||||
|
test.add(tx, String.valueOf(expectedListEntries++), new String("AA"));
|
||||||
|
|
||||||
|
for (int j = 0; j < expectedListEntries; j++) {
|
||||||
|
|
||||||
|
String sequenceSet = test.get(tx, String.valueOf(j));
|
||||||
|
|
||||||
|
int startSequenceId = nextSequenceId;
|
||||||
|
for (int ix = 0; ix < NUM_ITERATIONS; ix++) {
|
||||||
|
sequenceSet.concat(String.valueOf(nextSequenceId++));
|
||||||
|
test.put(tx, String.valueOf(j), sequenceSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
sequenceSet = test.get(tx, String.valueOf(j));
|
||||||
|
|
||||||
|
for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) {
|
||||||
|
//sequenceSet.remove(startSequenceId++);
|
||||||
|
test.put(tx, String.valueOf(j), String.valueOf(j));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
exerciseAnotherIndex(tx);
|
||||||
|
LOG.info("Checking if Index has the expected number of entries.");
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_ITERATIONS; ++i) {
|
||||||
|
assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
|
||||||
|
assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i)));
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Index has the expected number of entries.");
|
||||||
|
|
||||||
|
assertEquals(expectedListEntries, test.size());
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_ITERATIONS; ++i) {
|
||||||
|
LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
|
||||||
|
|
||||||
|
assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
|
||||||
|
assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
|
||||||
|
LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
|
||||||
|
|
||||||
|
assertEquals(expectedListEntries - (i + 1), test.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void doInsertReverse(int count) throws Exception {
|
void doInsertReverse(int count) throws Exception {
|
||||||
for (int i = count-1; i >= 0; i--) {
|
for (int i = count-1; i >= 0; i--) {
|
||||||
index.put(tx, key(i), (long)i);
|
index.put(tx, key(i), (long)i);
|
||||||
|
|
Loading…
Reference in New Issue