[AMQ-9234] Remove JournalJDBC store and activeio dependency (#993)

This commit is contained in:
Matt Pavlovich 2023-03-29 08:38:25 -05:00 committed by GitHub
parent 1fdccd9cf4
commit 5ca71dfee6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 44 additions and 2675 deletions

View File

@ -49,10 +49,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
</dependency>
<!-- geronimo -->
<dependency>

View File

@ -53,11 +53,6 @@
<artifactId>derbytools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>true</optional>
</dependency>
<!-- =============================== -->
<!-- Testing Dependencies -->

View File

@ -1,429 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.journal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
*
*/
public class JournalMessageStore extends AbstractMessageStore {
private static final Logger LOG = LoggerFactory.getLogger(JournalMessageStore.class);
protected final JournalPersistenceAdapter peristenceAdapter;
protected final JournalTransactionStore transactionStore;
protected final MessageStore longTermStore;
protected final TransactionTemplate transactionTemplate;
protected RecordLocation lastLocation;
protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>();
private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
/** A MessageStore that we can use to retrieve messages quickly. */
private Map<MessageId, Message> cpAddedMessageIds;
private MemoryUsage memoryUsage;
public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
super(destination);
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.longTermStore = checkpointStore;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
this.memoryUsage=memoryUsage;
longTermStore.setMemoryUsage(memoryUsage);
}
/**
* Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
*/
public void addMessage(final ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
final boolean debug = LOG.isDebugEnabled();
message.incrementReferenceCount();
final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
if (!context.isInTransaction()) {
if (debug) {
LOG.debug("Journalled message add for: " + id + ", at: " + location);
}
addMessage(context, message, location);
} else {
if (debug) {
LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
}
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.addMessage(this, message, location);
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
if (debug) {
LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
}
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
addMessage(context, message, location);
}
}
public void afterRollback() throws Exception {
if (debug) {
LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
}
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
}
message.decrementReferenceCount();
}
});
}
}
void addMessage(ConnectionContext context, final Message message, final RecordLocation location) {
synchronized (this) {
lastLocation = location;
MessageId id = message.getMessageId();
messages.put(id, message);
message.getMessageId().setFutureOrSequenceLong(0l);
if (indexListener != null) {
indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
}
}
}
public void replayAddMessage(ConnectionContext context, Message message) {
try {
// Only add the message if it has not already been added.
Message t = longTermStore.getMessage(message.getMessageId());
if (t == null) {
longTermStore.addMessage(context, message);
}
} catch (Throwable e) {
LOG.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
}
}
/**
*/
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
final boolean debug = LOG.isDebugEnabled();
JournalQueueAck remove = new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
if (!context.isInTransaction()) {
if (debug) {
LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
}
removeMessage(ack, location);
} else {
if (debug) {
LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
}
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.removeMessage(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
if (debug) {
LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
}
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
removeMessage(ack, location);
}
}
public void afterRollback() throws Exception {
if (debug) {
LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
}
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
});
}
}
final void removeMessage(final MessageAck ack, final RecordLocation location) {
synchronized (this) {
lastLocation = location;
MessageId id = ack.getLastMessageId();
Message message = messages.remove(id);
if (message == null) {
messageAcks.add(ack);
} else {
message.decrementReferenceCount();
}
}
}
public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
Message t = longTermStore.getMessage(messageAck.getLastMessageId());
if (t != null) {
longTermStore.removeMessage(context, messageAck);
}
} catch (Throwable e) {
LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
}
}
/**
* @return
* @throws IOException
*/
public RecordLocation checkpoint() throws IOException {
return checkpoint(null);
}
/**
* @return
* @throws IOException
*/
@SuppressWarnings("unchecked")
public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
final List<MessageAck> cpRemovedMessageLocations;
final List<RecordLocation> cpActiveJournalLocations;
final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
// swap out the message hash maps..
synchronized (this) {
cpAddedMessageIds = this.messages;
cpRemovedMessageLocations = this.messageAcks;
cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations);
this.messages = new LinkedHashMap<MessageId, Message>();
this.messageAcks = new ArrayList<MessageAck>();
}
transactionTemplate.run(new Callback() {
public void execute() throws Exception {
int size = 0;
PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
ConnectionContext context = transactionTemplate.getContext();
// Checkpoint the added messages.
synchronized (JournalMessageStore.this) {
Iterator<Message> iterator = cpAddedMessageIds.values().iterator();
while (iterator.hasNext()) {
Message message = iterator.next();
try {
longTermStore.addMessage(context, message);
} catch (Throwable e) {
LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
size += message.getSize();
message.decrementReferenceCount();
// Commit the batch if it's getting too big
if (size >= maxCheckpointMessageAddSize) {
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
size = 0;
}
}
}
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages.
Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator();
while (iterator.hasNext()) {
try {
MessageAck ack = iterator.next();
longTermStore.removeMessage(transactionTemplate.getContext(), ack);
} catch (Throwable e) {
LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e);
}
}
if (postCheckpointTest != null) {
postCheckpointTest.execute();
}
}
});
synchronized (this) {
cpAddedMessageIds = null;
}
if (cpActiveJournalLocations.size() > 0) {
Collections.sort(cpActiveJournalLocations);
return cpActiveJournalLocations.get(0);
}
synchronized (this) {
return lastLocation;
}
}
/**
*
*/
public Message getMessage(MessageId identity) throws IOException {
Message answer = null;
synchronized (this) {
// Do we have a still have it in the journal?
answer = messages.get(identity);
if (answer == null && cpAddedMessageIds != null) {
answer = cpAddedMessageIds.get(identity);
}
}
if (answer != null) {
return answer;
}
// If all else fails try the long term message store.
return longTermStore.getMessage(identity);
}
/**
* Replays the checkpointStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
*
* @param listener
* @throws Exception
*/
public void recover(final MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true, true);
longTermStore.recover(listener);
}
public void start() throws Exception {
if (this.memoryUsage != null) {
this.memoryUsage.addUsageListener(peristenceAdapter);
}
longTermStore.start();
}
public void stop() throws Exception {
longTermStore.stop();
if (this.memoryUsage != null) {
this.memoryUsage.removeUsageListener(peristenceAdapter);
}
}
/**
* @return Returns the longTermStore.
*/
public MessageStore getLongTermMessageStore() {
return longTermStore;
}
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
public void removeAllMessages(ConnectionContext context) throws IOException {
peristenceAdapter.checkpoint(true, true);
longTermStore.removeAllMessages(context);
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}
public String getMessageReference(MessageId identity) throws IOException {
throw new IOException("The journal does not support message references.");
}
/**
* @return
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
public int getMessageCount() throws IOException {
peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount();
}
public long getMessageSize() throws IOException {
peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageSize();
}
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(maxReturned, listener);
}
public void resetBatching() {
longTermStore.resetBatching();
}
@Override
public void setBatch(MessageId messageId) throws Exception {
peristenceAdapter.checkpoint(true, true);
longTermStore.setBatch(messageId);
}
}

View File

@ -1,816 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.journal;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
* other long term persistent storage.
*
* @deprecated - Deprecated for removal as this PersistenceAdapter is no longer used and
* replaced by the JDBCPersistenceAdapter.
*
* @org.apache.xbean.XBean
*
*/
@Deprecated(forRemoval = true)
public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
private BrokerService brokerService;
protected Scheduler scheduler;
private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
private Journal journal;
private PersistenceAdapter longTermPersistence;
private final WireFormat wireFormat = new OpenWireFormat();
private final ConcurrentMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
private final ConcurrentMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
private SystemUsage usageManager;
private long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
private int maxCheckpointWorkers = 10;
private int maxCheckpointMessageAddSize = 1024 * 1024;
private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
private ThreadPoolExecutor checkpointExecutor;
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
private boolean fullCheckPoint;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
private TaskRunnerFactory taskRunnerFactory;
private File directory;
public JournalPersistenceAdapter() {
}
public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
setJournal(journal);
setTaskRunnerFactory(taskRunnerFactory);
setPersistenceAdapter(longTermPersistence);
}
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
this.taskRunnerFactory = taskRunnerFactory;
}
public void setJournal(Journal journal) {
this.journal = journal;
journal.setJournalEventListener(this);
}
public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
this.longTermPersistence = longTermPersistence;
}
final Runnable createPeriodicCheckpointTask() {
return new Runnable() {
@Override
public void run() {
long lastTime = 0;
synchronized (this) {
lastTime = lastCheckpointRequest;
}
if (System.currentTimeMillis() > lastTime + checkpointInterval) {
checkpoint(false, true);
}
}
};
}
/**
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
@Override
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
longTermPersistence.setUsageManager(usageManager);
}
@Override
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
destinations.addAll(queues.keySet());
destinations.addAll(topics.keySet());
return destinations;
}
private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
if (destination.isQueue()) {
return createQueueMessageStore((ActiveMQQueue)destination);
} else {
return createTopicMessageStore((ActiveMQTopic)destination);
}
}
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
JournalMessageStore store = queues.get(destination);
if (store == null) {
MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
store = new JournalMessageStore(this, checkpointStore, destination);
queues.put(destination, store);
}
return store;
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
JournalTopicMessageStore store = topics.get(destinationName);
if (store == null) {
TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
topics.put(destinationName, store);
}
return store;
}
/**
* Cleanup method to remove any state associated with the given destination
*
* @param destination Destination to forget
*/
@Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
queues.remove(destination);
}
/**
* Cleanup method to remove any state associated with the given destination
*
* @param destination Destination to forget
*/
@Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
@Override
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
@Override
public long getLastMessageBrokerSequenceId() throws IOException {
return longTermPersistence.getLastMessageBrokerSequenceId();
}
@Override
public void beginTransaction(ConnectionContext context) throws IOException {
longTermPersistence.beginTransaction(context);
}
@Override
public void commitTransaction(ConnectionContext context) throws IOException {
longTermPersistence.commitTransaction(context);
}
@Override
public void rollbackTransaction(ConnectionContext context) throws IOException {
longTermPersistence.rollbackTransaction(context);
}
@Override
public synchronized void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
}
if( brokerService!=null ) {
wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
}
checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
@Override
public boolean iterate() {
return doCheckpoint();
}
}, "ActiveMQ Journal Checkpoint Worker");
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Journal checkpoint worker");
t.setPriority(7);
return t;
}
});
// checkpointExecutor.allowCoreThreadTimeOut(true);
this.usageManager.getMemoryUsage().addUsageListener(this);
if (longTermPersistence instanceof JDBCPersistenceAdapter) {
// Disabled periodic clean up as it deadlocks with the checkpoint
// operations.
((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
}
longTermPersistence.start();
createTransactionStore();
recover();
// Do a checkpoint periodically.
this.scheduler = new Scheduler("Journal Scheduler");
this.scheduler.start();
this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
}
@Override
public void stop() throws Exception {
this.usageManager.getMemoryUsage().removeUsageListener(this);
if (!started.compareAndSet(true, false)) {
return;
}
this.scheduler.cancel(periodicCheckpointTask);
this.scheduler.stop();
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true, true);
checkpointTask.shutdown();
ThreadPoolUtils.shutdown(checkpointExecutor);
checkpointExecutor = null;
queues.clear();
topics.clear();
IOException firstException = null;
try {
journal.close();
} catch (Exception e) {
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
}
longTermPersistence.stop();
if (firstException != null) {
throw firstException;
}
}
// Properties
// -------------------------------------------------------------------------
public PersistenceAdapter getLongTermPersistence() {
return longTermPersistence;
}
/**
* @return Returns the wireFormat.
*/
public WireFormat getWireFormat() {
return wireFormat;
}
// Implementation methods
// -------------------------------------------------------------------------
/**
* The Journal give us a call back so that we can move old data out of the
* journal. Taking a checkpoint does this for us.
*
* @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
*/
@Override
public void overflowNotification(RecordLocation safeLocation) {
checkpoint(false, true);
}
/**
* When we checkpoint we move all the journalled data to long term storage.
*
*/
public void checkpoint(boolean sync, boolean fullCheckpoint) {
try {
if (journal == null) {
throw new IllegalStateException("Journal is closed.");
}
long now = System.currentTimeMillis();
CountDownLatch latch = null;
synchronized (this) {
latch = nextCheckpointCountDownLatch;
lastCheckpointRequest = now;
if (fullCheckpoint) {
this.fullCheckPoint = true;
}
}
checkpointTask.wakeup();
if (sync) {
LOG.debug("Waking for checkpoint to complete.");
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Request to start checkpoint failed: " + e, e);
}
}
@Override
public void checkpoint(boolean sync) {
checkpoint(sync, sync);
}
/**
* This does the actual checkpoint.
*
* @return
*/
public boolean doCheckpoint() {
CountDownLatch latch = null;
boolean fullCheckpoint;
synchronized (this) {
latch = nextCheckpointCountDownLatch;
nextCheckpointCountDownLatch = new CountDownLatch(1);
fullCheckpoint = this.fullCheckPoint;
this.fullCheckPoint = false;
}
try {
LOG.debug("Checkpoint started.");
RecordLocation newMark = null;
ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
//
// We do many partial checkpoints (fullCheckpoint==false) to move
// topic messages
// to long term store as soon as possible.
//
// We want to avoid doing that for queue messages since removes the
// come in the same
// checkpoint cycle will nullify the previous message add.
// Therefore, we only
// checkpoint queues on the fullCheckpoint cycles.
//
if (fullCheckpoint) {
Iterator<JournalMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) {
try {
final JournalMessageStore ms = iterator.next();
FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
@Override
public RecordLocation call() throws Exception {
return ms.checkpoint();
}
});
futureTasks.add(task);
checkpointExecutor.execute(task);
} catch (Exception e) {
LOG.error("Failed to checkpoint a message store: " + e, e);
}
}
}
Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
while (iterator.hasNext()) {
try {
final JournalTopicMessageStore ms = iterator.next();
FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
@Override
public RecordLocation call() throws Exception {
return ms.checkpoint();
}
});
futureTasks.add(task);
checkpointExecutor.execute(task);
} catch (Exception e) {
LOG.error("Failed to checkpoint a message store: " + e, e);
}
}
try {
for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
FutureTask<RecordLocation> ft = iter.next();
RecordLocation mark = ft.get();
// We only set a newMark on full checkpoints.
if (fullCheckpoint) {
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
newMark = mark;
}
}
}
} catch (Throwable e) {
LOG.error("Failed to checkpoint a message store: " + e, e);
}
if (fullCheckpoint) {
try {
if (newMark != null) {
LOG.debug("Marking journal at: " + newMark);
journal.setMark(newMark, true);
}
} catch (Exception e) {
LOG.error("Failed to mark the Journal: " + e, e);
}
if (longTermPersistence instanceof JDBCPersistenceAdapter) {
// We may be check pointing more often than the
// checkpointInterval if under high use
// But we don't want to clean up the db that often.
long now = System.currentTimeMillis();
if (now > lastCleanup + checkpointInterval) {
lastCleanup = now;
((JDBCPersistenceAdapter)longTermPersistence).cleanup();
}
}
}
LOG.debug("Checkpoint done.");
} finally {
latch.countDown();
}
synchronized (this) {
return this.fullCheckPoint;
}
}
/**
* @param location
* @return
* @throws IOException
*/
public DataStructure readCommand(RecordLocation location) throws IOException {
try {
Packet packet = journal.read(location);
return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
} catch (InvalidRecordLocationException e) {
throw createReadException(location, e);
} catch (IOException e) {
throw createReadException(location, e);
}
}
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.
*
* @throws IOException
* @throws IOException
* @throws InvalidRecordLocationException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
RecordLocation pos = null;
int transactionCounter = 0;
LOG.info("Journal Recovery Started from: " + journal);
ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
// While we have records in the journal.
while ((pos = journal.getNextRecordLocation(pos)) != null) {
Packet data = journal.read(pos);
DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
if (c instanceof Message) {
Message message = (Message)c;
JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
if (message.isInTransaction()) {
transactionStore.addMessage(store, message, pos);
} else {
store.replayAddMessage(context, message);
transactionCounter++;
}
} else {
switch (c.getDataStructureType()) {
case JournalQueueAck.DATA_STRUCTURE_TYPE: {
JournalQueueAck command = (JournalQueueAck)c;
JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
if (command.getMessageAck().isInTransaction()) {
transactionStore.removeMessage(store, command.getMessageAck(), pos);
} else {
store.replayRemoveMessage(context, command.getMessageAck());
transactionCounter++;
}
}
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE: {
JournalTopicAck command = (JournalTopicAck)c;
JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
if (command.getTransactionId() != null) {
transactionStore.acknowledge(store, command, pos);
} else {
store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
transactionCounter++;
}
}
break;
case JournalTransaction.DATA_STRUCTURE_TYPE: {
JournalTransaction command = (JournalTransaction)c;
try {
// Try to replay the packet.
switch (command.getType()) {
case JournalTransaction.XA_PREPARE:
transactionStore.replayPrepare(command.getTransactionId());
break;
case JournalTransaction.XA_COMMIT:
case JournalTransaction.LOCAL_COMMIT:
Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
if (tx == null) {
break; // We may be trying to replay a commit
}
// that
// was already committed.
// Replay the committed operations.
tx.getOperations();
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
TxOperation op = (TxOperation)iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
op.store.replayAddMessage(context, (Message)op.data);
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
op.store.replayRemoveMessage(context, (MessageAck)op.data);
}
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
JournalTopicAck ack = (JournalTopicAck)op.data;
((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
}
}
transactionCounter++;
break;
case JournalTransaction.LOCAL_ROLLBACK:
case JournalTransaction.XA_ROLLBACK:
transactionStore.replayRollback(command.getTransactionId());
break;
default:
throw new IOException("Invalid journal command type: " + command.getType());
}
} catch (IOException e) {
LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
}
}
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
JournalTrace trace = (JournalTrace)c;
LOG.debug("TRACE Entry: " + trace.getMessage());
break;
default:
LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
}
}
}
RecordLocation location = writeTraceMessage("RECOVERED", true);
journal.setMark(location, true);
LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
}
private IOException createReadException(RecordLocation location, Exception e) {
return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
}
protected IOException createWriteException(DataStructure packet, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
}
protected IOException createWriteException(String command, Exception e) {
return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
}
protected IOException createRecoveryFailedException(Exception e) {
return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
}
/**
* @param command
* @param sync
* @return
* @throws IOException
*/
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
if (started.get()) {
try {
return journal.write(toPacket(wireFormat.marshal(command)), sync);
} catch (IOException ioe) {
LOG.error("Cannot write to the journal", ioe);
brokerService.handleIOException(ioe);
throw ioe;
}
}
throw new IOException("closed");
}
private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
JournalTrace trace = new JournalTrace();
trace.setMessage(message);
return writeCommand(trace, sync);
}
@Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
boolean sync = newPercentUsage >= 90;
checkpoint(sync, true);
}
}
public JournalTransactionStore getTransactionStore() {
return transactionStore;
}
@Override
public void deleteAllMessages() throws IOException {
try {
JournalTrace trace = new JournalTrace();
trace.setMessage("DELETED");
RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
journal.setMark(location, true);
LOG.info("Journal deleted: ");
} catch (IOException e) {
throw e;
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
longTermPersistence.deleteAllMessages();
}
public SystemUsage getUsageManager() {
return usageManager;
}
public int getMaxCheckpointMessageAddSize() {
return maxCheckpointMessageAddSize;
}
public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
}
public int getMaxCheckpointWorkers() {
return maxCheckpointWorkers;
}
public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
this.maxCheckpointWorkers = maxCheckpointWorkers;
}
public long getCheckpointInterval() {
return checkpointInterval;
}
public void setCheckpointInterval(long checkpointInterval) {
this.checkpointInterval = checkpointInterval;
}
public boolean isUseExternalMessageReferences() {
return false;
}
public void setUseExternalMessageReferences(boolean enable) {
if (enable) {
throw new IllegalArgumentException("The journal does not support message references.");
}
}
public Packet toPacket(ByteSequence sequence) {
return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
}
public ByteSequence toByteSequence(Packet packet) {
org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
@Override
public void setBrokerName(String brokerName) {
longTermPersistence.setBrokerName(brokerName);
}
@Override
public String toString() {
return "JournalPersistenceAdapter(" + longTermPersistence + ")";
}
@Override
public void setDirectory(File dir) {
this.directory=dir;
}
@Override
public File getDirectory(){
return directory;
}
@Override
public long size(){
return 0;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
PersistenceAdapter pa = getLongTermPersistence();
if( pa instanceof BrokerServiceAware ) {
((BrokerServiceAware)pa).setBrokerService(brokerService);
}
}
@Override
public long getLastProducerSequenceId(ProducerId id) {
return -1;
}
@Override
public void allowIOResumption() {
longTermPersistence.allowIOResumption();
}
@Override
public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
return longTermPersistence.createJobSchedulerStore();
}
}

View File

@ -1,274 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.journal;
import java.io.File;
import java.io.IOException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activeio.journal.active.JournalLockedException;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Factory class that can create PersistenceAdapter objects.
*
* @deprecated Deprecated for removal as this PersistenceAdapter is no longer used and
* replaced by the JDBCPersistenceAdapter.
*
* @org.apache.xbean.XBean
*
*/
@Deprecated(forRemoval = true)
public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory {
private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class);
private long checkpointInterval = 1000 * 60 * 5;
private int journalLogFileSize = 1024 * 1024 * 20;
private int journalLogFiles = 2;
private TaskRunnerFactory taskRunnerFactory;
private Journal journal;
private boolean useJournal = true;
private boolean useQuickJournal;
private File journalArchiveDirectory;
private boolean failIfJournalIsLocked;
private int journalThreadPriority = Thread.MAX_PRIORITY;
private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
private boolean useDedicatedTaskRunner;
public PersistenceAdapter createPersistenceAdapter() throws IOException {
jdbcPersistenceAdapter.setDataSource(getDataSource());
if (!useJournal) {
return jdbcPersistenceAdapter;
}
JournalPersistenceAdapter result = new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
result.setDirectory(getDataDirectoryFile());
result.setCheckpointInterval(getCheckpointInterval());
return result;
}
public int getJournalLogFiles() {
return journalLogFiles;
}
/**
* Sets the number of journal log files to use
*/
public void setJournalLogFiles(int journalLogFiles) {
this.journalLogFiles = journalLogFiles;
}
public int getJournalLogFileSize() {
return journalLogFileSize;
}
/**
* Sets the size of the journal log files
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setJournalLogFileSize(int journalLogFileSize) {
this.journalLogFileSize = journalLogFileSize;
}
public JDBCPersistenceAdapter getJdbcAdapter() {
return jdbcPersistenceAdapter;
}
public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
this.jdbcPersistenceAdapter = jdbcAdapter;
}
public boolean isUseJournal() {
return useJournal;
}
public long getCheckpointInterval() {
return checkpointInterval;
}
public void setCheckpointInterval(long checkpointInterval) {
this.checkpointInterval = checkpointInterval;
}
/**
* Enables or disables the use of the journal. The default is to use the
* journal
*
* @param useJournal
*/
public void setUseJournal(boolean useJournal) {
this.useJournal = useJournal;
}
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
true, 1000, isUseDedicatedTaskRunner());
}
return taskRunnerFactory;
}
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
this.taskRunnerFactory = taskRunnerFactory;
}
public Journal getJournal() throws IOException {
if (journal == null) {
createJournal();
}
return journal;
}
public void setJournal(Journal journal) {
this.journal = journal;
}
public File getJournalArchiveDirectory() {
if (journalArchiveDirectory == null && useQuickJournal) {
journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
}
return journalArchiveDirectory;
}
public void setJournalArchiveDirectory(File journalArchiveDirectory) {
this.journalArchiveDirectory = journalArchiveDirectory;
}
public boolean isUseQuickJournal() {
return useQuickJournal;
}
/**
* Enables or disables the use of quick journal, which keeps messages in the
* journal and just stores a reference to the messages in JDBC. Defaults to
* false so that messages actually reside long term in the JDBC database.
*/
public void setUseQuickJournal(boolean useQuickJournal) {
this.useQuickJournal = useQuickJournal;
}
public JDBCAdapter getAdapter() throws IOException {
return jdbcPersistenceAdapter.getAdapter();
}
public void setAdapter(JDBCAdapter adapter) {
jdbcPersistenceAdapter.setAdapter(adapter);
}
public Statements getStatements() {
return jdbcPersistenceAdapter.getStatements();
}
public void setStatements(Statements statements) {
jdbcPersistenceAdapter.setStatements(statements);
}
/**
* Sets whether or not an exclusive database lock should be used to enable
* JDBC Master/Slave. Enabled by default.
*/
public void setUseDatabaseLock(boolean useDatabaseLock) {
jdbcPersistenceAdapter.setUseLock(useDatabaseLock);
}
public boolean isCreateTablesOnStartup() {
return jdbcPersistenceAdapter.isCreateTablesOnStartup();
}
/**
* Sets whether or not tables are created on startup
*/
public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
}
public int getJournalThreadPriority() {
return journalThreadPriority;
}
/**
* Sets the thread priority of the journal thread
*/
public void setJournalThreadPriority(int journalThreadPriority) {
this.journalThreadPriority = journalThreadPriority;
}
/**
* @throws IOException
*/
protected void createJournal() throws IOException {
File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
if (failIfJournalIsLocked) {
journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
getJournalArchiveDirectory());
} else {
while (true) {
try {
journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
getJournalArchiveDirectory());
break;
} catch (JournalLockedException e) {
LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
+ " seconds for the journal to be unlocked.");
try {
Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
}
}
}
}
}
@Override
public Locker createDefaultLocker() throws IOException {
return null;
}
@Override
public void init() throws Exception {
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {}
@Override
protected void doStart() throws Exception {}
}

View File

@ -1,242 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.journal;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
*
*/
public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
private static final Logger LOG = LoggerFactory.getLogger(JournalTopicMessageStore.class);
private TopicMessageStore longTermStore;
private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore,
ActiveMQTopic destinationName) {
super(adapter, checkpointStore, destinationName);
this.longTermStore = checkpointStore;
}
@Override
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
throws Exception {
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverSubscription(clientId, subscriptionName, listener);
}
@Override
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
MessageRecoveryListener listener) throws Exception {
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
}
@Override
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return longTermStore.lookupSubscription(clientId, subscriptionName);
}
@Override
public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
this.peristenceAdapter.checkpoint(true, true);
longTermStore.addSubscription(subscriptionInfo, retroactive);
}
@Override
public void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
}
/**
*/
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
final MessageId messageId, MessageAck originalAck) throws IOException {
final boolean debug = LOG.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
ack.setTransactionId(context.getTransaction() != null
? context.getTransaction().getTransactionId() : null);
final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
if (!context.isInTransaction()) {
if (debug) {
LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
}
acknowledge(messageId, location, key);
} else {
if (debug) {
LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
}
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.acknowledge(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
if (debug) {
LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
}
synchronized (JournalTopicMessageStore.this) {
inFlightTxLocations.remove(location);
acknowledge(messageId, location, key);
}
}
@Override
public void afterRollback() throws Exception {
if (debug) {
LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
}
synchronized (JournalTopicMessageStore.this) {
inFlightTxLocations.remove(location);
}
}
});
}
}
public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName,
MessageId messageId) {
try {
SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
if (sub != null) {
longTermStore.acknowledge(context, clientId, subscritionName, messageId, null);
}
} catch (Throwable e) {
LOG.debug("Could not replay acknowledge for message '" + messageId
+ "'. Message may have already been acknowledged. reason: " + e);
}
}
/**
* @param messageId
* @param location
* @param key
*/
protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
synchronized (this) {
lastLocation = location;
ackedLastAckLocations.put(key, messageId);
}
}
@Override
public RecordLocation checkpoint() throws IOException {
final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
// swap out the hash maps..
synchronized (this) {
cpAckedLastAckLocations = this.ackedLastAckLocations;
this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
}
return super.checkpoint(new Callback() {
@Override
public void execute() throws Exception {
// Checkpoint the acknowledged messages.
Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
while (iterator.hasNext()) {
SubscriptionKey subscriptionKey = iterator.next();
MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
MessageAck ack = new MessageAck();
ack.setMessageID(identity);
longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
subscriptionKey.subscriptionName, identity, ack);
}
}
});
}
/**
* @return Returns the longTermStore.
*/
public TopicMessageStore getLongTermTopicMessageStore() {
return longTermStore;
}
@Override
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
longTermStore.deleteSubscription(clientId, subscriptionName);
}
@Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return longTermStore.getAllSubscriptions();
}
@Override
public int getMessageCount(String clientId, String subscriberName) throws IOException {
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId, subscriberName);
}
@Override
public long getMessageSize(String clientId, String subscriberName) throws IOException {
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageSize(clientId, subscriberName);
}
@Override
public void resetBatching(String clientId, String subscriptionName) {
longTermStore.resetBatching(clientId, subscriptionName);
}
private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
@Override
public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
return stats;
}
}

View File

@ -1,350 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.journal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.transaction.xa.XAException;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
/**
*/
public class JournalTransactionStore implements TransactionStore {
private final JournalPersistenceAdapter peristenceAdapter;
private final Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
private final Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
private boolean doingRecover;
public static class TxOperation {
static final byte ADD_OPERATION_TYPE = 0;
static final byte REMOVE_OPERATION_TYPE = 1;
static final byte ACK_OPERATION_TYPE = 3;
public byte operationType;
public JournalMessageStore store;
public Object data;
public TxOperation(byte operationType, JournalMessageStore store, Object data) {
this.operationType = operationType;
this.store = store;
this.data = data;
}
}
/**
* Operations
*
*
*/
public static class Tx {
private final RecordLocation location;
private final ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
public Tx(RecordLocation location) {
this.location = location;
}
public void add(JournalMessageStore store, Message msg) {
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
}
public void add(JournalMessageStore store, MessageAck ack) {
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
}
public void add(JournalTopicMessageStore store, JournalTopicAck ack) {
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
}
public Message[] getMessages() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
list.add(op.data);
}
}
Message rc[] = new Message[list.size()];
list.toArray(rc);
return rc;
}
public MessageAck[] getAcks() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
list.add(op.data);
}
}
MessageAck rc[] = new MessageAck[list.size()];
list.toArray(rc);
return rc;
}
public ArrayList<TxOperation> getOperations() {
return operations;
}
}
public JournalTransactionStore(JournalPersistenceAdapter adapter) {
this.peristenceAdapter = adapter;
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException {
Tx tx = null;
synchronized (inflightTransactions) {
tx = inflightTransactions.remove(txid);
}
if (tx == null) {
return;
}
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false),
true);
synchronized (preparedTransactions) {
preparedTransactions.put(txid, tx);
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void replayPrepare(TransactionId txid) throws IOException {
Tx tx = null;
synchronized (inflightTransactions) {
tx = inflightTransactions.remove(txid);
}
if (tx == null) {
return;
}
synchronized (preparedTransactions) {
preparedTransactions.put(txid, tx);
}
}
public Tx getTx(Object txid, RecordLocation location) {
Tx tx = null;
synchronized (inflightTransactions) {
tx = inflightTransactions.get(txid);
}
if (tx == null) {
tx = new Tx(location);
inflightTransactions.put(txid, tx);
}
return tx;
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
Tx tx;
if (preCommit != null) {
preCommit.run();
}
if (wasPrepared) {
synchronized (preparedTransactions) {
tx = preparedTransactions.remove(txid);
}
} else {
synchronized (inflightTransactions) {
tx = inflightTransactions.remove(txid);
}
}
if (tx == null) {
if (postCommit != null) {
postCommit.run();
}
return;
}
if (txid.isXATransaction()) {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid,
wasPrepared), true);
} else {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
wasPrepared), true);
}
if (postCommit != null) {
postCommit.run();
}
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
if (wasPrepared) {
synchronized (preparedTransactions) {
return preparedTransactions.remove(txid);
}
} else {
synchronized (inflightTransactions) {
return inflightTransactions.remove(txid);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException {
Tx tx = null;
synchronized (inflightTransactions) {
tx = inflightTransactions.remove(txid);
}
if (tx != null) {
synchronized (preparedTransactions) {
tx = preparedTransactions.remove(txid);
}
}
if (tx != null) {
if (txid.isXATransaction()) {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid,
false), true);
} else {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,
txid, false), true);
}
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void replayRollback(TransactionId txid) throws IOException {
boolean inflight = false;
synchronized (inflightTransactions) {
inflight = inflightTransactions.remove(txid) != null;
}
if (inflight) {
synchronized (preparedTransactions) {
preparedTransactions.remove(txid);
}
}
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
// All the in-flight transactions get rolled back..
synchronized (inflightTransactions) {
inflightTransactions.clear();
}
this.doingRecover = true;
try {
Map<TransactionId, Tx> txs = null;
synchronized (preparedTransactions) {
txs = new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
}
for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
Object txid = iter.next();
Tx tx = txs.get(txid);
listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
}
} finally {
this.doingRecover = false;
}
}
/**
* @param message
* @throws IOException
*/
void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException {
Tx tx = getTx(message.getTransactionId(), location);
tx.add(store, message);
}
/**
* @param ack
* @throws IOException
*/
public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location)
throws IOException {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
public RecordLocation checkpoint() throws IOException {
// Nothing really to checkpoint.. since, we don't
// checkpoint tx operations in to long term store until they are
// committed.
// But we keep track of the first location of an operation
// that was associated with an active tx. The journal can not
// roll over active tx records.
RecordLocation rc = null;
synchronized (inflightTransactions) {
for (Iterator<Tx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
Tx tx = iter.next();
RecordLocation location = tx.location;
if (rc == null || rc.compareTo(location) < 0) {
rc = location;
}
}
}
synchronized (preparedTransactions) {
for (Iterator<Tx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
Tx tx = iter.next();
RecordLocation location = tx.location;
if (rc == null || rc.compareTo(location) < 0) {
rc = location;
}
}
return rc;
}
}
public boolean isDoingRecover() {
return doingRecover;
}
}

View File

@ -1,27 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
<p>
Message persistence using a high performance transaction log via the Journal interface.
</p>
</body>
</html>

View File

@ -39,11 +39,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>

View File

@ -56,7 +56,6 @@
<bundle dependency="true">mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xpp3/${xpp3-bundle-version}</bundle>
<bundle dependency="true">mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xstream/${xstream-bundle-version}</bundle>
<bundle dependency="true">mvn:org.apache.aries/org.apache.aries.util/${aries-version}</bundle>
<bundle dependency="true">mvn:org.apache.activemq/activeio-core/${activeio-version}</bundle>
<bundle dependency="true">mvn:org.codehaus.jettison/jettison/${jettison-version}</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-core/${jackson-version}</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-databind/${jackson-version}</bundle>

View File

@ -40,11 +40,6 @@
<artifactId>activemq-broker</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>

View File

@ -174,7 +174,6 @@
javax.resource*;resolution:=optional,
javax.servlet*;resolution:=optional,
com.thoughtworks.xstream*;resolution:=optional,
org.apache.activeio*;resolution:=optional,
org.apache.camel*;version="${camel-version-range}";resolution:=optional,
org.apache.camel.spring.handler;version="${camel-version-range}";resolution:=optional,
org.apache.camel.spring.xml.handler;version="${camel-version-range}";resolution:=optional,

View File

@ -69,11 +69,6 @@
<artifactId>activemq-kahadb-store</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>true</optional>
</dependency>
<!-- add the optional replication deps -->
<dependency>

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
import org.springframework.beans.factory.FactoryBean;
/**
* Creates a default persistence model using the Journal and JDBC
*
* @org.apache.xbean.XBean element="journaledJDBC"
*
*
*/
public class PersistenceAdapterFactoryBean extends JournalPersistenceAdapterFactory implements FactoryBean {
private PersistenceAdapter persistenceAdaptor;
public Object getObject() throws Exception {
if (persistenceAdaptor == null) {
persistenceAdaptor = createPersistenceAdapter();
}
return persistenceAdaptor;
}
public Class getObjectType() {
return PersistenceAdapter.class;
}
public boolean isSingleton() {
return false;
}
}

View File

@ -42,10 +42,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-console</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activeio-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>

View File

@ -43,10 +43,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-console</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activeio-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>

View File

@ -46,10 +46,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-console</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activeio-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>

View File

@ -74,11 +74,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>

View File

@ -50,7 +50,6 @@ import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.DefaultDatabaseLocker;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.adapter.TransactDatabaseLocker;
import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.usage.SystemUsage;
@ -87,43 +86,6 @@ public class ConfigTest {
* for succeeding creation. It uses the first created directory as the root.
*/
/*
* This tests creating a journal persistence adapter using the persistence
* adapter factory bean
*/
@Test
public void testJournaledJDBCConfig() throws Exception {
File journalFile = new File(JOURNAL_ROOT + "testJournaledJDBCConfig/journal");
recursiveDelete(journalFile);
File derbyFile = new File(DERBY_ROOT + "testJournaledJDBCConfig/derbydb"); // Default
recursiveDelete(derbyFile);
BrokerService broker;
broker = createBroker(new FileSystemResource(CONF_ROOT + "journaledjdbc-example.xml"));
try {
assertEquals("Broker Config Error (brokerName)", "brokerJournaledJDBCConfigTest", broker.getBrokerName());
PersistenceAdapter adapter = broker.getPersistenceAdapter();
assertTrue("Should have created a journal persistence adapter", adapter instanceof JournalPersistenceAdapter);
assertTrue("Should have created a derby directory at " + derbyFile.getAbsolutePath(), derbyFile.exists());
assertTrue("Should have created a journal directory at " + journalFile.getAbsolutePath(), journalFile.exists());
// Check persistence factory configurations
broker.getPersistenceAdapter();
assertTrue(broker.getSystemUsage().getStoreUsage().getStore() instanceof JournalPersistenceAdapter);
LOG.info("Success");
} finally {
if (broker != null) {
broker.stop();
}
}
}
@Test
public void testJdbcLockConfigOverride() throws Exception {
@ -291,32 +253,6 @@ public class ConfigTest {
}
}
/*
* This tests creating a journal persistence adapter using xbeans-spring
*/
@Test
public void testJournalConfig() throws Exception {
File journalFile = new File(JOURNAL_ROOT + "testJournalConfig/journal");
recursiveDelete(journalFile);
BrokerService broker;
broker = createBroker(new FileSystemResource(CONF_ROOT + "journal-example.xml"));
try {
assertEquals("Broker Config Error (brokerName)", "brokerJournalConfigTest", broker.getBrokerName());
PersistenceAdapter adapter = broker.getPersistenceAdapter();
assertTrue("Should have created a journal persistence adapter", adapter instanceof JournalPersistenceAdapter);
assertTrue("Should have created a journal directory at " + journalFile.getAbsolutePath(), journalFile.exists());
LOG.info("Success");
} finally {
if (broker != null) {
broker.stop();
}
}
}
/*
* This tests creating a memory persistence adapter using xbeans-spring
*/

View File

@ -61,13 +61,6 @@ public class InactiveDurableTopicTest extends TestCase {
broker = new BrokerService();
//broker.setPersistenceAdapter(new KahaPersistenceAdapter());
/*
* JournalPersistenceAdapterFactory factory = new
* JournalPersistenceAdapterFactory();
* factory.setDataDirectoryFile(broker.getDataDirectory());
* factory.setTaskRunnerFactory(broker.getTaskRunnerFactory());
* factory.setUseJournal(false); broker.setPersistenceFactory(factory);
*/
broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
broker.start();
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);

View File

@ -58,13 +58,7 @@ public class InactiveQueueTest extends TestCase {
// broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File
// ("TEST_STUFD")));
/*
* JournalPersistenceAdapterFactory factory = new
* JournalPersistenceAdapterFactory();
* factory.setDataDirectoryFile(broker.getDataDirectory());
* factory.setTaskRunnerFactory(broker.getTaskRunnerFactory());
* factory.setUseJournal(false); broker.setPersistenceFactory(factory);
*/
broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
broker.start();
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);

View File

@ -1,38 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
/**
*
*/
public class JournalDurableSubscriptionTest extends DurableSubscriptionTestSupport {
protected PersistenceAdapter createPersistenceAdapter() throws IOException {
File dataDir = new File("target/test-data/durableJournal");
JournalPersistenceAdapterFactory factory = new JournalPersistenceAdapterFactory();
factory.setDataDirectoryFile(dataDir);
factory.setUseJournal(true);
factory.setJournalLogFileSize(1024 * 64);
return factory.createPersistenceAdapter();
}
}

View File

@ -1,62 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.xbean;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
/**
*
*/
public class JDBCPersistenceXBeanConfigTest extends TestCase {
protected BrokerService brokerService;
public void testConfiguredCorrectly() throws Exception {
PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
assertNotNull(persistenceAdapter);
assertTrue(persistenceAdapter instanceof JDBCPersistenceAdapter);
JDBCPersistenceAdapter jpa = (JDBCPersistenceAdapter)persistenceAdapter;
assertEquals("BROKER1.", jpa.getStatements().getTablePrefix());
}
protected void setUp() throws Exception {
brokerService = createBroker();
brokerService.start();
}
protected void tearDown() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
}
protected BrokerService createBroker() throws Exception {
String uri = "org/apache/activemq/xbean/jdbc-persistence-test.xml";
return BrokerFactory.createBroker(new URI("xbean:" + uri));
}
}

View File

@ -32,10 +32,13 @@
<broker useJmx="true" deleteAllMessagesOnStartup="true" systemUsage="#memory-manager" xmlns="http://activemq.apache.org/schema/core">
<persistenceFactory>
<journalPersistenceAdapterFactory
useQuickJournal="false" journalLogFiles="2" dataDirectory="loadtest"/>
</persistenceFactory>
<persistenceAdapter>
<kahaDB directory="target/activemq-data/loadtester/kahadb"
cleanupInterval="300000" checkpointInterval="50000"
journalMaxWriteBatchSize="62k"
journalMaxFileLength="1g"
indexCacheSize="100000" indexWriteBatchSize="100000"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector uri="tcp://localhost:0"/>
@ -43,19 +46,5 @@
</broker>
<!-- The Derby Datasource that will be used by the Broker -->
<!--
<bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource" destroy-method="close">
<property name="serverName" value="localhost"/>
<property name="databaseName" value="activemq"/>
<property name="portNumber" value="0"/>
<property name="user" value="activemq"/>
<property name="password" value="activemq"/>
<property name="dataSourceName" value="postgres"/>
<property name="initialConnections" value="1"/>
<property name="maxConnections" value="10"/>
</bean>
-->
</beans>
<!-- END SNIPPET: xbean -->

View File

@ -1,60 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<amq:broker brokerName="brokerJournalConfigTest" persistent="true" useShutdownHook="false" deleteAllMessagesOnStartup="true">
<amq:persistenceAdapter>
<amq:journalPersistenceAdapter>
<amq:journal>
<ref bean="myJournalImpl"/>
</amq:journal>
<amq:persistenceAdapter>
<amq:memoryPersistenceAdapter createTransactionStore="true"/>
</amq:persistenceAdapter>
<amq:taskRunnerFactory>
<bean id="myTaskRunnerFactory" class="org.apache.activemq.thread.TaskRunnerFactory"/>
</amq:taskRunnerFactory>
</amq:journalPersistenceAdapter>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:61635"/>
</amq:transportConnectors>
</amq:broker>
<!-- The journal implementation that will be used -->
<bean id="myJournalImpl" class="org.apache.activeio.journal.active.JournalImpl">
<constructor-arg index="0">
<bean id="myFile" class="java.io.File">
<constructor-arg index="0">
<value>target/test-data/testJournalConfig/journal</value>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
</beans>

View File

@ -1,36 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<amq:broker brokerName="brokerJournaledJDBCConfigTest" persistent="true" useShutdownHook="false" deleteAllMessagesOnStartup="true">
<amq:persistenceFactory>
<amq:journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" dataDirectory="target/test-data/testJournaledJDBCConfig" />
</amq:persistenceFactory>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:61635"/>
</amq:transportConnectors>
</amq:broker>
</beans>

View File

@ -26,13 +26,17 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<!-- Default configuration -->
<broker id="default" useJmx="false" xmlns="http://activemq.apache.org/schema/core">
<broker id="default" useJmx="false" xmlns="http://activemq.apache.org/schema/core" deleteAllMessagesOnStartup="true">
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="2" dataDirectory="target/foo"/>
</persistenceFactory>
<persistenceAdapter>
<kahaDB directory="target/activemq-data/command/kahadb"
cleanupInterval="300000" checkpointInterval="50000"
journalMaxWriteBatchSize="62k"
journalMaxFileLength="1g"
indexCacheSize="100000" indexWriteBatchSize="100000"/>
</persistenceAdapter>
<transportConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616" />
</transportConnectors>

View File

@ -22,9 +22,9 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">
<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" deleteAllMessagesOnStartup="true">
<!-- In ActiveMQ 4, you can setup destination policies -->
<!-- In ActiveMQ 4, you can setup destination policies -->
<destinationPolicy>
<policyMap><policyEntries>
@ -55,13 +55,13 @@
-->
</networkConnectors>
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="../data"/>
<!-- To use a different datasource, use the following syntax : -->
<!--
<journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="../data" dataSource="#postgres-ds"/>
-->
</persistenceFactory>
<persistenceAdapter>
<kahaDB directory="target/activemq-data/usecases/kahadb"
cleanupInterval="300000" checkpointInterval="50000"
journalMaxWriteBatchSize="62k"
journalMaxFileLength="1g"
indexCacheSize="100000" indexWriteBatchSize="100000"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
@ -69,40 +69,5 @@
</broker>
<!-- This xbean configuration file supports all the standard spring xml configuration options -->
<!-- Postgres DataSource Sample Setup -->
<!--
<bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource" destroy-method="close">
<property name="serverName" value="localhost"/>
<property name="databaseName" value="activemq"/>
<property name="portNumber" value="0"/>
<property name="user" value="activemq"/>
<property name="password" value="activemq"/>
<property name="dataSourceName" value="postgres"/>
<property name="initialConnections" value="1"/>
<property name="maxConnections" value="10"/>
</bean>
-->
<!-- MySql DataSource Sample Setup -->
<!--
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
-->
<!-- Embedded Derby DataSource Sample Setup -->
<!--
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="derbydb"/>
<property name="createDatabase" value="create"/>
</bean>
-->
</beans>
<!-- END SNIPPET: xbean -->

View File

@ -27,7 +27,7 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">
<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" deleteAllMessagesOnStartup="true">
<networkConnectors>
<!--
@ -36,14 +36,13 @@
-->
</networkConnectors>
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="${basedir}/target" />
<!-- To use a different dataSource, use the following syntax : -->
<!--
<journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="${basedir}/activemq-data" dataSource="#mysql-ds"/>
-->
</persistenceFactory>
<persistenceAdapter>
<kahaDB directory="target/activemq-data/xbean/kahadb"
cleanupInterval="300000" checkpointInterval="50000"
journalMaxWriteBatchSize="62k"
journalMaxFileLength="1g"
indexCacheSize="100000" indexWriteBatchSize="100000"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector uri="tcp://localhost:61636" />
@ -51,15 +50,5 @@
</broker>
<!-- MySql DataSource Sample Setup -->
<!--
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
-->
</beans>
<!-- END SNIPPET: example -->

View File

@ -26,7 +26,7 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<!-- for easier integration with the JUnit tests, lets not start the broker when the ApplicationContext starts -->
<broker useJmx="true" start="false" xmlns="http://activemq.apache.org/schema/core">
<broker useJmx="true" start="false" xmlns="http://activemq.apache.org/schema/core" deleteAllMessagesOnStartup="true">
<destinationPolicy>
<policyMap>
<policyEntries>
@ -43,14 +43,17 @@
</policyMap>
</destinationPolicy>
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="${basedir}/target/data" />
</persistenceFactory>
<persistenceAdapter>
<kahaDB directory="target/activemq-data/xbean-2/kahadb"
cleanupInterval="300000" checkpointInterval="50000"
journalMaxWriteBatchSize="62k"
journalMaxFileLength="1g"
indexCacheSize="100000" indexWriteBatchSize="100000"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616" />
</transportConnectors>
</broker>
</beans>
</beans>

View File

@ -1,42 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: xbean -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker useJmx="false" xmlns="http://activemq.apache.org/schema/core">
<persistenceFactory>
<journalPersistenceAdapterFactory useJournal="false">
<statements>
<statements tablePrefix="BROKER1."/>
</statements>
</journalPersistenceAdapterFactory>
</persistenceFactory>
</broker>
</beans>
<!-- END SNIPPET: xbean -->

View File

@ -171,16 +171,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jaas</artifactId>

View File

@ -37,10 +37,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-client</artifactId>

View File

@ -184,7 +184,6 @@
<include>${pom.groupId}:activemq-pool</include>
<include>${pom.groupId}:activemq-partition</include>
<include>${pom.groupId}:activemq-shiro</include>
<include>${pom.groupId}:activeio-core</include>
<include>commons-beanutils:commons-beanutils</include>
<include>commons-collections:commons-collections</include>
<include>commons-io:commons-io</include>

View File

@ -134,15 +134,6 @@
<kahaDB directory="${activemq.data}/dynamic-broker1/kahadb" indexWriteBatchSize="1000" enableIndexWriteAsync="true" enableJournalDiskSyncs="false" />
</persistenceAdapter>
<!--
Configure the following if you wish to use journaled JDBC for message
persistence.
<persistenceFactory>
<journalPersistenceAdapterFactory dataDirectory="${activemq.data}" dataSource="#postgres-ds"/>
</persistenceFactory>
-->
<!--
Configure the following if you wish to use non-journaled JDBC for message
persistence.

View File

@ -27,7 +27,7 @@
<broker xmlns="http://activemq.apache.org/schema/core" useJmx="false">
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="../data"/>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceFactory>
<transportConnectors>

13
pom.xml
View File

@ -34,7 +34,6 @@
<!-- for reproducible builds -->
<project.build.outputTimestamp>2023-03-28T14:10:59Z</project.build.outputTimestamp>
<activeio-version>3.1.4</activeio-version>
<siteId>activemq-${project.version}</siteId>
<projectName>Apache ActiveMQ</projectName>
<!-- base url for site deployment. See distribution management for full url. Override this in settings.xml for staging -->
@ -415,18 +414,6 @@
<version>${project.version}</version>
<type>war</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activeio-core</artifactId>
<version>${activeio-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activeio-core</artifactId>
<version>${activeio-version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-openwire-generator</artifactId>