diff --git a/activemq-console/pom.xml b/activemq-console/pom.xml index f3cb911e60..2321eaf0d9 100644 --- a/activemq-console/pom.xml +++ b/activemq-console/pom.xml @@ -49,10 +49,6 @@ ${project.groupId} activemq-kahadb-store - - ${project.groupId} - activeio-core - diff --git a/activemq-jdbc-store/pom.xml b/activemq-jdbc-store/pom.xml index 2e5ce68eec..0e2cb1d02a 100644 --- a/activemq-jdbc-store/pom.xml +++ b/activemq-jdbc-store/pom.xml @@ -53,11 +53,6 @@ derbytools true - - ${project.groupId} - activeio-core - true - diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java deleted file mode 100644 index 7ec10c4d9a..0000000000 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java +++ /dev/null @@ -1,429 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.journal; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.activeio.journal.RecordLocation; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.JournalQueueAck; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.filter.NonCachedMessageEvaluationContext; -import org.apache.activemq.store.IndexListener; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.AbstractMessageStore; -import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.util.Callback; -import org.apache.activemq.util.TransactionTemplate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A MessageStore that uses a Journal to store it's messages. - * - * - */ -public class JournalMessageStore extends AbstractMessageStore { - - private static final Logger LOG = LoggerFactory.getLogger(JournalMessageStore.class); - - protected final JournalPersistenceAdapter peristenceAdapter; - protected final JournalTransactionStore transactionStore; - protected final MessageStore longTermStore; - protected final TransactionTemplate transactionTemplate; - protected RecordLocation lastLocation; - protected Set inFlightTxLocations = new HashSet(); - - private Map messages = new LinkedHashMap(); - private List messageAcks = new ArrayList(); - - /** A MessageStore that we can use to retrieve messages quickly. */ - private Map cpAddedMessageIds; - - - private MemoryUsage memoryUsage; - - public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) { - super(destination); - this.peristenceAdapter = adapter; - this.transactionStore = adapter.getTransactionStore(); - this.longTermStore = checkpointStore; - this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext())); - } - - - public void setMemoryUsage(MemoryUsage memoryUsage) { - this.memoryUsage=memoryUsage; - longTermStore.setMemoryUsage(memoryUsage); - } - - /** - * Not synchronized since the Journal has better throughput if you increase - * the number of concurrent writes that it is doing. - */ - public void addMessage(final ConnectionContext context, final Message message) throws IOException { - - final MessageId id = message.getMessageId(); - - final boolean debug = LOG.isDebugEnabled(); - message.incrementReferenceCount(); - - final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); - if (!context.isInTransaction()) { - if (debug) { - LOG.debug("Journalled message add for: " + id + ", at: " + location); - } - addMessage(context, message, location); - } else { - if (debug) { - LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); - } - synchronized (this) { - inFlightTxLocations.add(location); - } - transactionStore.addMessage(this, message, location); - context.getTransaction().addSynchronization(new Synchronization() { - public void afterCommit() throws Exception { - if (debug) { - LOG.debug("Transacted message add commit for: " + id + ", at: " + location); - } - synchronized (JournalMessageStore.this) { - inFlightTxLocations.remove(location); - addMessage(context, message, location); - } - } - - public void afterRollback() throws Exception { - if (debug) { - LOG.debug("Transacted message add rollback for: " + id + ", at: " + location); - } - synchronized (JournalMessageStore.this) { - inFlightTxLocations.remove(location); - } - message.decrementReferenceCount(); - } - }); - } - } - - void addMessage(ConnectionContext context, final Message message, final RecordLocation location) { - synchronized (this) { - lastLocation = location; - MessageId id = message.getMessageId(); - messages.put(id, message); - message.getMessageId().setFutureOrSequenceLong(0l); - if (indexListener != null) { - indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); - } - } - } - - public void replayAddMessage(ConnectionContext context, Message message) { - try { - // Only add the message if it has not already been added. - Message t = longTermStore.getMessage(message.getMessageId()); - if (t == null) { - longTermStore.addMessage(context, message); - } - } catch (Throwable e) { - LOG.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e); - } - } - - /** - */ - public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { - final boolean debug = LOG.isDebugEnabled(); - JournalQueueAck remove = new JournalQueueAck(); - remove.setDestination(destination); - remove.setMessageAck(ack); - - final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); - if (!context.isInTransaction()) { - if (debug) { - LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location); - } - removeMessage(ack, location); - } else { - if (debug) { - LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); - } - synchronized (this) { - inFlightTxLocations.add(location); - } - transactionStore.removeMessage(this, ack, location); - context.getTransaction().addSynchronization(new Synchronization() { - public void afterCommit() throws Exception { - if (debug) { - LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location); - } - synchronized (JournalMessageStore.this) { - inFlightTxLocations.remove(location); - removeMessage(ack, location); - } - } - - public void afterRollback() throws Exception { - if (debug) { - LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location); - } - synchronized (JournalMessageStore.this) { - inFlightTxLocations.remove(location); - } - } - }); - - } - } - - final void removeMessage(final MessageAck ack, final RecordLocation location) { - synchronized (this) { - lastLocation = location; - MessageId id = ack.getLastMessageId(); - Message message = messages.remove(id); - if (message == null) { - messageAcks.add(ack); - } else { - message.decrementReferenceCount(); - } - } - } - - public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { - try { - // Only remove the message if it has not already been removed. - Message t = longTermStore.getMessage(messageAck.getLastMessageId()); - if (t != null) { - longTermStore.removeMessage(context, messageAck); - } - } catch (Throwable e) { - LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); - } - } - - /** - * @return - * @throws IOException - */ - public RecordLocation checkpoint() throws IOException { - return checkpoint(null); - } - - /** - * @return - * @throws IOException - */ - @SuppressWarnings("unchecked") - public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException { - - final List cpRemovedMessageLocations; - final List cpActiveJournalLocations; - final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); - - // swap out the message hash maps.. - synchronized (this) { - cpAddedMessageIds = this.messages; - cpRemovedMessageLocations = this.messageAcks; - - cpActiveJournalLocations = new ArrayList(inFlightTxLocations); - - this.messages = new LinkedHashMap(); - this.messageAcks = new ArrayList(); - } - - transactionTemplate.run(new Callback() { - public void execute() throws Exception { - - int size = 0; - - PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter(); - ConnectionContext context = transactionTemplate.getContext(); - - // Checkpoint the added messages. - synchronized (JournalMessageStore.this) { - Iterator iterator = cpAddedMessageIds.values().iterator(); - while (iterator.hasNext()) { - Message message = iterator.next(); - try { - longTermStore.addMessage(context, message); - } catch (Throwable e) { - LOG.warn("Message could not be added to long term store: " + e.getMessage(), e); - } - size += message.getSize(); - message.decrementReferenceCount(); - // Commit the batch if it's getting too big - if (size >= maxCheckpointMessageAddSize) { - persitanceAdapter.commitTransaction(context); - persitanceAdapter.beginTransaction(context); - size = 0; - } - } - } - - persitanceAdapter.commitTransaction(context); - persitanceAdapter.beginTransaction(context); - - // Checkpoint the removed messages. - Iterator iterator = cpRemovedMessageLocations.iterator(); - while (iterator.hasNext()) { - try { - MessageAck ack = iterator.next(); - longTermStore.removeMessage(transactionTemplate.getContext(), ack); - } catch (Throwable e) { - LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e); - } - } - - if (postCheckpointTest != null) { - postCheckpointTest.execute(); - } - } - - }); - - synchronized (this) { - cpAddedMessageIds = null; - } - - if (cpActiveJournalLocations.size() > 0) { - Collections.sort(cpActiveJournalLocations); - return cpActiveJournalLocations.get(0); - } - synchronized (this) { - return lastLocation; - } - } - - /** - * - */ - public Message getMessage(MessageId identity) throws IOException { - Message answer = null; - - synchronized (this) { - // Do we have a still have it in the journal? - answer = messages.get(identity); - if (answer == null && cpAddedMessageIds != null) { - answer = cpAddedMessageIds.get(identity); - } - } - - if (answer != null) { - return answer; - } - - // If all else fails try the long term message store. - return longTermStore.getMessage(identity); - } - - /** - * Replays the checkpointStore first as those messages are the oldest ones, - * then messages are replayed from the transaction log and then the cache is - * updated. - * - * @param listener - * @throws Exception - */ - public void recover(final MessageRecoveryListener listener) throws Exception { - peristenceAdapter.checkpoint(true, true); - longTermStore.recover(listener); - } - - public void start() throws Exception { - if (this.memoryUsage != null) { - this.memoryUsage.addUsageListener(peristenceAdapter); - } - longTermStore.start(); - } - - public void stop() throws Exception { - longTermStore.stop(); - if (this.memoryUsage != null) { - this.memoryUsage.removeUsageListener(peristenceAdapter); - } - } - - /** - * @return Returns the longTermStore. - */ - public MessageStore getLongTermMessageStore() { - return longTermStore; - } - - /** - * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) - */ - public void removeAllMessages(ConnectionContext context) throws IOException { - peristenceAdapter.checkpoint(true, true); - longTermStore.removeAllMessages(context); - } - - public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { - throw new IOException("The journal does not support message references."); - } - - public String getMessageReference(MessageId identity) throws IOException { - throw new IOException("The journal does not support message references."); - } - - /** - * @return - * @throws IOException - * @see org.apache.activemq.store.MessageStore#getMessageCount() - */ - public int getMessageCount() throws IOException { - peristenceAdapter.checkpoint(true, true); - return longTermStore.getMessageCount(); - } - - public long getMessageSize() throws IOException { - peristenceAdapter.checkpoint(true, true); - return longTermStore.getMessageSize(); - } - - public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { - peristenceAdapter.checkpoint(true, true); - longTermStore.recoverNextMessages(maxReturned, listener); - - } - - public void resetBatching() { - longTermStore.resetBatching(); - - } - - @Override - public void setBatch(MessageId messageId) throws Exception { - peristenceAdapter.checkpoint(true, true); - longTermStore.setBatch(messageId); - } - -} diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java deleted file mode 100644 index 0d541c31c5..0000000000 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ /dev/null @@ -1,816 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.journal; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activeio.journal.InvalidRecordLocationException; -import org.apache.activeio.journal.Journal; -import org.apache.activeio.journal.JournalEventListener; -import org.apache.activeio.journal.RecordLocation; -import org.apache.activeio.packet.ByteArrayPacket; -import org.apache.activeio.packet.Packet; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.DataStructure; -import org.apache.activemq.command.JournalQueueAck; -import org.apache.activemq.command.JournalTopicAck; -import org.apache.activemq.command.JournalTrace; -import org.apache.activemq.command.JournalTransaction; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.ProducerId; -import org.apache.activemq.filter.NonCachedMessageEvaluationContext; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TransactionStore; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.store.journal.JournalTransactionStore.Tx; -import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation; -import org.apache.activemq.thread.Scheduler; -import org.apache.activemq.thread.Task; -import org.apache.activemq.thread.TaskRunner; -import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.usage.Usage; -import org.apache.activemq.usage.UsageListener; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.ThreadPoolUtils; -import org.apache.activemq.wireformat.WireFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation of {@link PersistenceAdapter} designed for use with a - * {@link Journal} and then check pointing asynchronously on a timeout with some - * other long term persistent storage. - * - * @deprecated - Deprecated for removal as this PersistenceAdapter is no longer used and - * replaced by the JDBCPersistenceAdapter. - * - * @org.apache.xbean.XBean - * - */ -@Deprecated(forRemoval = true) -public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { - - private BrokerService brokerService; - - protected Scheduler scheduler; - private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class); - - private Journal journal; - private PersistenceAdapter longTermPersistence; - - private final WireFormat wireFormat = new OpenWireFormat(); - - private final ConcurrentMap queues = new ConcurrentHashMap(); - private final ConcurrentMap topics = new ConcurrentHashMap(); - - private SystemUsage usageManager; - private long checkpointInterval = 1000 * 60 * 5; - private long lastCheckpointRequest = System.currentTimeMillis(); - private long lastCleanup = System.currentTimeMillis(); - private int maxCheckpointWorkers = 10; - private int maxCheckpointMessageAddSize = 1024 * 1024; - - private final JournalTransactionStore transactionStore = new JournalTransactionStore(this); - private ThreadPoolExecutor checkpointExecutor; - - private TaskRunner checkpointTask; - private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); - private boolean fullCheckPoint; - - private final AtomicBoolean started = new AtomicBoolean(false); - - private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); - - private TaskRunnerFactory taskRunnerFactory; - private File directory; - - public JournalPersistenceAdapter() { - } - - public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { - setJournal(journal); - setTaskRunnerFactory(taskRunnerFactory); - setPersistenceAdapter(longTermPersistence); - } - - public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { - this.taskRunnerFactory = taskRunnerFactory; - } - - public void setJournal(Journal journal) { - this.journal = journal; - journal.setJournalEventListener(this); - } - - public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) { - this.longTermPersistence = longTermPersistence; - } - - final Runnable createPeriodicCheckpointTask() { - return new Runnable() { - @Override - public void run() { - long lastTime = 0; - synchronized (this) { - lastTime = lastCheckpointRequest; - } - if (System.currentTimeMillis() > lastTime + checkpointInterval) { - checkpoint(false, true); - } - } - }; - } - - /** - * @param usageManager The UsageManager that is controlling the - * destination's memory usage. - */ - @Override - public void setUsageManager(SystemUsage usageManager) { - this.usageManager = usageManager; - longTermPersistence.setUsageManager(usageManager); - } - - @Override - public Set getDestinations() { - Set destinations = new HashSet(longTermPersistence.getDestinations()); - destinations.addAll(queues.keySet()); - destinations.addAll(topics.keySet()); - return destinations; - } - - private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { - if (destination.isQueue()) { - return createQueueMessageStore((ActiveMQQueue)destination); - } else { - return createTopicMessageStore((ActiveMQTopic)destination); - } - } - - @Override - public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - JournalMessageStore store = queues.get(destination); - if (store == null) { - MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination); - store = new JournalMessageStore(this, checkpointStore, destination); - queues.put(destination, store); - } - return store; - } - - @Override - public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { - JournalTopicMessageStore store = topics.get(destinationName); - if (store == null) { - TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName); - store = new JournalTopicMessageStore(this, checkpointStore, destinationName); - topics.put(destinationName, store); - } - return store; - } - - /** - * Cleanup method to remove any state associated with the given destination - * - * @param destination Destination to forget - */ - @Override - public void removeQueueMessageStore(ActiveMQQueue destination) { - queues.remove(destination); - } - - /** - * Cleanup method to remove any state associated with the given destination - * - * @param destination Destination to forget - */ - @Override - public void removeTopicMessageStore(ActiveMQTopic destination) { - topics.remove(destination); - } - - @Override - public TransactionStore createTransactionStore() throws IOException { - return transactionStore; - } - - @Override - public long getLastMessageBrokerSequenceId() throws IOException { - return longTermPersistence.getLastMessageBrokerSequenceId(); - } - - @Override - public void beginTransaction(ConnectionContext context) throws IOException { - longTermPersistence.beginTransaction(context); - } - - @Override - public void commitTransaction(ConnectionContext context) throws IOException { - longTermPersistence.commitTransaction(context); - } - - @Override - public void rollbackTransaction(ConnectionContext context) throws IOException { - longTermPersistence.rollbackTransaction(context); - } - - @Override - public synchronized void start() throws Exception { - if (!started.compareAndSet(false, true)) { - return; - } - - if( brokerService!=null ) { - wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); - } - - checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { - @Override - public boolean iterate() { - return doCheckpoint(); - } - }, "ActiveMQ Journal Checkpoint Worker"); - - checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { - @Override - public Thread newThread(Runnable runable) { - Thread t = new Thread(runable, "Journal checkpoint worker"); - t.setPriority(7); - return t; - } - }); - // checkpointExecutor.allowCoreThreadTimeOut(true); - - this.usageManager.getMemoryUsage().addUsageListener(this); - - if (longTermPersistence instanceof JDBCPersistenceAdapter) { - // Disabled periodic clean up as it deadlocks with the checkpoint - // operations. - ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0); - } - - longTermPersistence.start(); - createTransactionStore(); - recover(); - - // Do a checkpoint periodically. - this.scheduler = new Scheduler("Journal Scheduler"); - this.scheduler.start(); - this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); - - } - - @Override - public void stop() throws Exception { - - this.usageManager.getMemoryUsage().removeUsageListener(this); - if (!started.compareAndSet(true, false)) { - return; - } - - this.scheduler.cancel(periodicCheckpointTask); - this.scheduler.stop(); - - // Take one final checkpoint and stop checkpoint processing. - checkpoint(true, true); - checkpointTask.shutdown(); - ThreadPoolUtils.shutdown(checkpointExecutor); - checkpointExecutor = null; - - queues.clear(); - topics.clear(); - - IOException firstException = null; - try { - journal.close(); - } catch (Exception e) { - firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); - } - longTermPersistence.stop(); - - if (firstException != null) { - throw firstException; - } - } - - // Properties - // ------------------------------------------------------------------------- - public PersistenceAdapter getLongTermPersistence() { - return longTermPersistence; - } - - /** - * @return Returns the wireFormat. - */ - public WireFormat getWireFormat() { - return wireFormat; - } - - // Implementation methods - // ------------------------------------------------------------------------- - - /** - * The Journal give us a call back so that we can move old data out of the - * journal. Taking a checkpoint does this for us. - * - * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) - */ - @Override - public void overflowNotification(RecordLocation safeLocation) { - checkpoint(false, true); - } - - /** - * When we checkpoint we move all the journalled data to long term storage. - * - */ - public void checkpoint(boolean sync, boolean fullCheckpoint) { - try { - if (journal == null) { - throw new IllegalStateException("Journal is closed."); - } - - long now = System.currentTimeMillis(); - CountDownLatch latch = null; - synchronized (this) { - latch = nextCheckpointCountDownLatch; - lastCheckpointRequest = now; - if (fullCheckpoint) { - this.fullCheckPoint = true; - } - } - - checkpointTask.wakeup(); - - if (sync) { - LOG.debug("Waking for checkpoint to complete."); - latch.await(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Request to start checkpoint failed: " + e, e); - } - } - - @Override - public void checkpoint(boolean sync) { - checkpoint(sync, sync); - } - - /** - * This does the actual checkpoint. - * - * @return - */ - public boolean doCheckpoint() { - CountDownLatch latch = null; - boolean fullCheckpoint; - synchronized (this) { - latch = nextCheckpointCountDownLatch; - nextCheckpointCountDownLatch = new CountDownLatch(1); - fullCheckpoint = this.fullCheckPoint; - this.fullCheckPoint = false; - } - try { - - LOG.debug("Checkpoint started."); - RecordLocation newMark = null; - - ArrayList> futureTasks = new ArrayList>(queues.size() + topics.size()); - - // - // We do many partial checkpoints (fullCheckpoint==false) to move - // topic messages - // to long term store as soon as possible. - // - // We want to avoid doing that for queue messages since removes the - // come in the same - // checkpoint cycle will nullify the previous message add. - // Therefore, we only - // checkpoint queues on the fullCheckpoint cycles. - // - if (fullCheckpoint) { - Iterator iterator = queues.values().iterator(); - while (iterator.hasNext()) { - try { - final JournalMessageStore ms = iterator.next(); - FutureTask task = new FutureTask(new Callable() { - @Override - public RecordLocation call() throws Exception { - return ms.checkpoint(); - } - }); - futureTasks.add(task); - checkpointExecutor.execute(task); - } catch (Exception e) { - LOG.error("Failed to checkpoint a message store: " + e, e); - } - } - } - - Iterator iterator = topics.values().iterator(); - while (iterator.hasNext()) { - try { - final JournalTopicMessageStore ms = iterator.next(); - FutureTask task = new FutureTask(new Callable() { - @Override - public RecordLocation call() throws Exception { - return ms.checkpoint(); - } - }); - futureTasks.add(task); - checkpointExecutor.execute(task); - } catch (Exception e) { - LOG.error("Failed to checkpoint a message store: " + e, e); - } - } - - try { - for (Iterator> iter = futureTasks.iterator(); iter.hasNext();) { - FutureTask ft = iter.next(); - RecordLocation mark = ft.get(); - // We only set a newMark on full checkpoints. - if (fullCheckpoint) { - if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { - newMark = mark; - } - } - } - } catch (Throwable e) { - LOG.error("Failed to checkpoint a message store: " + e, e); - } - - if (fullCheckpoint) { - try { - if (newMark != null) { - LOG.debug("Marking journal at: " + newMark); - journal.setMark(newMark, true); - } - } catch (Exception e) { - LOG.error("Failed to mark the Journal: " + e, e); - } - - if (longTermPersistence instanceof JDBCPersistenceAdapter) { - // We may be check pointing more often than the - // checkpointInterval if under high use - // But we don't want to clean up the db that often. - long now = System.currentTimeMillis(); - if (now > lastCleanup + checkpointInterval) { - lastCleanup = now; - ((JDBCPersistenceAdapter)longTermPersistence).cleanup(); - } - } - } - - LOG.debug("Checkpoint done."); - } finally { - latch.countDown(); - } - synchronized (this) { - return this.fullCheckPoint; - } - - } - - /** - * @param location - * @return - * @throws IOException - */ - public DataStructure readCommand(RecordLocation location) throws IOException { - try { - Packet packet = journal.read(location); - return (DataStructure)wireFormat.unmarshal(toByteSequence(packet)); - } catch (InvalidRecordLocationException e) { - throw createReadException(location, e); - } catch (IOException e) { - throw createReadException(location, e); - } - } - - /** - * Move all the messages that were in the journal into long term storage. We - * just replay and do a checkpoint. - * - * @throws IOException - * @throws IOException - * @throws InvalidRecordLocationException - * @throws IllegalStateException - */ - private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException { - - RecordLocation pos = null; - int transactionCounter = 0; - - LOG.info("Journal Recovery Started from: " + journal); - ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); - - // While we have records in the journal. - while ((pos = journal.getNextRecordLocation(pos)) != null) { - Packet data = journal.read(pos); - DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data)); - - if (c instanceof Message) { - Message message = (Message)c; - JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination()); - if (message.isInTransaction()) { - transactionStore.addMessage(store, message, pos); - } else { - store.replayAddMessage(context, message); - transactionCounter++; - } - } else { - switch (c.getDataStructureType()) { - case JournalQueueAck.DATA_STRUCTURE_TYPE: { - JournalQueueAck command = (JournalQueueAck)c; - JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination()); - if (command.getMessageAck().isInTransaction()) { - transactionStore.removeMessage(store, command.getMessageAck(), pos); - } else { - store.replayRemoveMessage(context, command.getMessageAck()); - transactionCounter++; - } - } - break; - case JournalTopicAck.DATA_STRUCTURE_TYPE: { - JournalTopicAck command = (JournalTopicAck)c; - JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination()); - if (command.getTransactionId() != null) { - transactionStore.acknowledge(store, command, pos); - } else { - store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()); - transactionCounter++; - } - } - break; - case JournalTransaction.DATA_STRUCTURE_TYPE: { - JournalTransaction command = (JournalTransaction)c; - try { - // Try to replay the packet. - switch (command.getType()) { - case JournalTransaction.XA_PREPARE: - transactionStore.replayPrepare(command.getTransactionId()); - break; - case JournalTransaction.XA_COMMIT: - case JournalTransaction.LOCAL_COMMIT: - Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); - if (tx == null) { - break; // We may be trying to replay a commit - } - // that - // was already committed. - - // Replay the committed operations. - tx.getOperations(); - for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { - TxOperation op = (TxOperation)iter.next(); - if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { - op.store.replayAddMessage(context, (Message)op.data); - } - if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { - op.store.replayRemoveMessage(context, (MessageAck)op.data); - } - if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { - JournalTopicAck ack = (JournalTopicAck)op.data; - ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()); - } - } - transactionCounter++; - break; - case JournalTransaction.LOCAL_ROLLBACK: - case JournalTransaction.XA_ROLLBACK: - transactionStore.replayRollback(command.getTransactionId()); - break; - default: - throw new IOException("Invalid journal command type: " + command.getType()); - } - } catch (IOException e) { - LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); - } - } - break; - case JournalTrace.DATA_STRUCTURE_TYPE: - JournalTrace trace = (JournalTrace)c; - LOG.debug("TRACE Entry: " + trace.getMessage()); - break; - default: - LOG.error("Unknown type of record in transaction log which will be discarded: " + c); - } - } - } - - RecordLocation location = writeTraceMessage("RECOVERED", true); - journal.setMark(location, true); - - LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); - } - - private IOException createReadException(RecordLocation location, Exception e) { - return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); - } - - protected IOException createWriteException(DataStructure packet, Exception e) { - return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); - } - - protected IOException createWriteException(String command, Exception e) { - return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); - } - - protected IOException createRecoveryFailedException(Exception e) { - return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); - } - - /** - * @param command - * @param sync - * @return - * @throws IOException - */ - public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { - if (started.get()) { - try { - return journal.write(toPacket(wireFormat.marshal(command)), sync); - } catch (IOException ioe) { - LOG.error("Cannot write to the journal", ioe); - brokerService.handleIOException(ioe); - throw ioe; - } - } - throw new IOException("closed"); - } - - private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException { - JournalTrace trace = new JournalTrace(); - trace.setMessage(message); - return writeCommand(trace, sync); - } - - @Override - public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { - newPercentUsage = (newPercentUsage / 10) * 10; - oldPercentUsage = (oldPercentUsage / 10) * 10; - if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { - boolean sync = newPercentUsage >= 90; - checkpoint(sync, true); - } - } - - public JournalTransactionStore getTransactionStore() { - return transactionStore; - } - - @Override - public void deleteAllMessages() throws IOException { - try { - JournalTrace trace = new JournalTrace(); - trace.setMessage("DELETED"); - RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false); - journal.setMark(location, true); - LOG.info("Journal deleted: "); - } catch (IOException e) { - throw e; - } catch (Throwable e) { - throw IOExceptionSupport.create(e); - } - longTermPersistence.deleteAllMessages(); - } - - public SystemUsage getUsageManager() { - return usageManager; - } - - public int getMaxCheckpointMessageAddSize() { - return maxCheckpointMessageAddSize; - } - - public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { - this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; - } - - public int getMaxCheckpointWorkers() { - return maxCheckpointWorkers; - } - - public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { - this.maxCheckpointWorkers = maxCheckpointWorkers; - } - - public long getCheckpointInterval() { - return checkpointInterval; - } - - public void setCheckpointInterval(long checkpointInterval) { - this.checkpointInterval = checkpointInterval; - } - - public boolean isUseExternalMessageReferences() { - return false; - } - - public void setUseExternalMessageReferences(boolean enable) { - if (enable) { - throw new IllegalArgumentException("The journal does not support message references."); - } - } - - public Packet toPacket(ByteSequence sequence) { - return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length)); - } - - public ByteSequence toByteSequence(Packet packet) { - org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); - return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); - } - - @Override - public void setBrokerName(String brokerName) { - longTermPersistence.setBrokerName(brokerName); - } - - @Override - public String toString() { - return "JournalPersistenceAdapter(" + longTermPersistence + ")"; - } - - @Override - public void setDirectory(File dir) { - this.directory=dir; - } - - @Override - public File getDirectory(){ - return directory; - } - - @Override - public long size(){ - return 0; - } - - @Override - public void setBrokerService(BrokerService brokerService) { - this.brokerService = brokerService; - PersistenceAdapter pa = getLongTermPersistence(); - if( pa instanceof BrokerServiceAware ) { - ((BrokerServiceAware)pa).setBrokerService(brokerService); - } - } - - @Override - public long getLastProducerSequenceId(ProducerId id) { - return -1; - } - - @Override - public void allowIOResumption() { - longTermPersistence.allowIOResumption(); - } - - @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { - return longTermPersistence.createJobSchedulerStore(); - } - -} diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java deleted file mode 100644 index 418fbc884e..0000000000 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.journal; - -import java.io.File; -import java.io.IOException; - -import org.apache.activeio.journal.Journal; -import org.apache.activeio.journal.active.JournalImpl; -import org.apache.activeio.journal.active.JournalLockedException; -import org.apache.activemq.broker.Locker; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.PersistenceAdapterFactory; -import org.apache.activemq.store.jdbc.DataSourceServiceSupport; -import org.apache.activemq.store.jdbc.JDBCAdapter; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.store.jdbc.Statements; -import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.util.ServiceStopper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory class that can create PersistenceAdapter objects. - * - * @deprecated Deprecated for removal as this PersistenceAdapter is no longer used and - * replaced by the JDBCPersistenceAdapter. - * - * @org.apache.xbean.XBean - * - */ -@Deprecated(forRemoval = true) -public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory { - - private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; - - private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class); - - private long checkpointInterval = 1000 * 60 * 5; - private int journalLogFileSize = 1024 * 1024 * 20; - private int journalLogFiles = 2; - private TaskRunnerFactory taskRunnerFactory; - private Journal journal; - private boolean useJournal = true; - private boolean useQuickJournal; - private File journalArchiveDirectory; - private boolean failIfJournalIsLocked; - private int journalThreadPriority = Thread.MAX_PRIORITY; - private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); - private boolean useDedicatedTaskRunner; - - public PersistenceAdapter createPersistenceAdapter() throws IOException { - jdbcPersistenceAdapter.setDataSource(getDataSource()); - - if (!useJournal) { - return jdbcPersistenceAdapter; - } - JournalPersistenceAdapter result = new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory()); - result.setDirectory(getDataDirectoryFile()); - result.setCheckpointInterval(getCheckpointInterval()); - return result; - - } - - public int getJournalLogFiles() { - return journalLogFiles; - } - - /** - * Sets the number of journal log files to use - */ - public void setJournalLogFiles(int journalLogFiles) { - this.journalLogFiles = journalLogFiles; - } - - public int getJournalLogFileSize() { - return journalLogFileSize; - } - - /** - * Sets the size of the journal log files - * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used - * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" - */ - public void setJournalLogFileSize(int journalLogFileSize) { - this.journalLogFileSize = journalLogFileSize; - } - - public JDBCPersistenceAdapter getJdbcAdapter() { - return jdbcPersistenceAdapter; - } - - public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) { - this.jdbcPersistenceAdapter = jdbcAdapter; - } - - public boolean isUseJournal() { - return useJournal; - } - - public long getCheckpointInterval() { - return checkpointInterval; - } - - public void setCheckpointInterval(long checkpointInterval) { - this.checkpointInterval = checkpointInterval; - } - - /** - * Enables or disables the use of the journal. The default is to use the - * journal - * - * @param useJournal - */ - public void setUseJournal(boolean useJournal) { - this.useJournal = useJournal; - } - - public boolean isUseDedicatedTaskRunner() { - return useDedicatedTaskRunner; - } - - public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { - this.useDedicatedTaskRunner = useDedicatedTaskRunner; - } - - public TaskRunnerFactory getTaskRunnerFactory() { - if (taskRunnerFactory == null) { - taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, - true, 1000, isUseDedicatedTaskRunner()); - } - return taskRunnerFactory; - } - - public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { - this.taskRunnerFactory = taskRunnerFactory; - } - - public Journal getJournal() throws IOException { - if (journal == null) { - createJournal(); - } - return journal; - } - - public void setJournal(Journal journal) { - this.journal = journal; - } - - public File getJournalArchiveDirectory() { - if (journalArchiveDirectory == null && useQuickJournal) { - journalArchiveDirectory = new File(getDataDirectoryFile(), "journal"); - } - return journalArchiveDirectory; - } - - public void setJournalArchiveDirectory(File journalArchiveDirectory) { - this.journalArchiveDirectory = journalArchiveDirectory; - } - - public boolean isUseQuickJournal() { - return useQuickJournal; - } - - /** - * Enables or disables the use of quick journal, which keeps messages in the - * journal and just stores a reference to the messages in JDBC. Defaults to - * false so that messages actually reside long term in the JDBC database. - */ - public void setUseQuickJournal(boolean useQuickJournal) { - this.useQuickJournal = useQuickJournal; - } - - public JDBCAdapter getAdapter() throws IOException { - return jdbcPersistenceAdapter.getAdapter(); - } - - public void setAdapter(JDBCAdapter adapter) { - jdbcPersistenceAdapter.setAdapter(adapter); - } - - public Statements getStatements() { - return jdbcPersistenceAdapter.getStatements(); - } - - public void setStatements(Statements statements) { - jdbcPersistenceAdapter.setStatements(statements); - } - - /** - * Sets whether or not an exclusive database lock should be used to enable - * JDBC Master/Slave. Enabled by default. - */ - public void setUseDatabaseLock(boolean useDatabaseLock) { - jdbcPersistenceAdapter.setUseLock(useDatabaseLock); - } - - public boolean isCreateTablesOnStartup() { - return jdbcPersistenceAdapter.isCreateTablesOnStartup(); - } - - /** - * Sets whether or not tables are created on startup - */ - public void setCreateTablesOnStartup(boolean createTablesOnStartup) { - jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup); - } - - public int getJournalThreadPriority() { - return journalThreadPriority; - } - - /** - * Sets the thread priority of the journal thread - */ - public void setJournalThreadPriority(int journalThreadPriority) { - this.journalThreadPriority = journalThreadPriority; - } - - /** - * @throws IOException - */ - protected void createJournal() throws IOException { - File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile(); - if (failIfJournalIsLocked) { - journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, - getJournalArchiveDirectory()); - } else { - while (true) { - try { - journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, - getJournalArchiveDirectory()); - break; - } catch (JournalLockedException e) { - LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) - + " seconds for the journal to be unlocked."); - try { - Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); - } catch (InterruptedException e1) { - } - } - } - } - } - - @Override - public Locker createDefaultLocker() throws IOException { - return null; - } - - @Override - public void init() throws Exception { - } - - @Override - protected void doStop(ServiceStopper stopper) throws Exception {} - - @Override - protected void doStart() throws Exception {} -} diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java deleted file mode 100644 index 7aa61a269c..0000000000 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java +++ /dev/null @@ -1,242 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.journal; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; - -import org.apache.activeio.journal.RecordLocation; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.JournalTopicAck; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.SubscriptionInfo; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.MessageStoreSubscriptionStatistics; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.util.Callback; -import org.apache.activemq.util.SubscriptionKey; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A MessageStore that uses a Journal to store it's messages. - * - * - */ -public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore { - - private static final Logger LOG = LoggerFactory.getLogger(JournalTopicMessageStore.class); - - private TopicMessageStore longTermStore; - private HashMap ackedLastAckLocations = new HashMap(); - - public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, - ActiveMQTopic destinationName) { - super(adapter, checkpointStore, destinationName); - this.longTermStore = checkpointStore; - } - - @Override - public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) - throws Exception { - this.peristenceAdapter.checkpoint(true, true); - longTermStore.recoverSubscription(clientId, subscriptionName, listener); - } - - @Override - public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, - MessageRecoveryListener listener) throws Exception { - this.peristenceAdapter.checkpoint(true, true); - longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener); - - } - - @Override - public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { - return longTermStore.lookupSubscription(clientId, subscriptionName); - } - - @Override - public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { - this.peristenceAdapter.checkpoint(true, true); - longTermStore.addSubscription(subscriptionInfo, retroactive); - } - - @Override - public void addMessage(ConnectionContext context, Message message) throws IOException { - super.addMessage(context, message); - } - - /** - */ - @Override - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, - final MessageId messageId, MessageAck originalAck) throws IOException { - final boolean debug = LOG.isDebugEnabled(); - - JournalTopicAck ack = new JournalTopicAck(); - ack.setDestination(destination); - ack.setMessageId(messageId); - ack.setMessageSequenceId(messageId.getBrokerSequenceId()); - ack.setSubscritionName(subscriptionName); - ack.setClientId(clientId); - ack.setTransactionId(context.getTransaction() != null - ? context.getTransaction().getTransactionId() : null); - final RecordLocation location = peristenceAdapter.writeCommand(ack, false); - - final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); - if (!context.isInTransaction()) { - if (debug) { - LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location); - } - acknowledge(messageId, location, key); - } else { - if (debug) { - LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); - } - synchronized (this) { - inFlightTxLocations.add(location); - } - transactionStore.acknowledge(this, ack, location); - context.getTransaction().addSynchronization(new Synchronization() { - @Override - public void afterCommit() throws Exception { - if (debug) { - LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); - } - synchronized (JournalTopicMessageStore.this) { - inFlightTxLocations.remove(location); - acknowledge(messageId, location, key); - } - } - - @Override - public void afterRollback() throws Exception { - if (debug) { - LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); - } - synchronized (JournalTopicMessageStore.this) { - inFlightTxLocations.remove(location); - } - } - }); - } - - } - - public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, - MessageId messageId) { - try { - SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName); - if (sub != null) { - longTermStore.acknowledge(context, clientId, subscritionName, messageId, null); - } - } catch (Throwable e) { - LOG.debug("Could not replay acknowledge for message '" + messageId - + "'. Message may have already been acknowledged. reason: " + e); - } - } - - /** - * @param messageId - * @param location - * @param key - */ - protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) { - synchronized (this) { - lastLocation = location; - ackedLastAckLocations.put(key, messageId); - } - } - - @Override - public RecordLocation checkpoint() throws IOException { - - final HashMap cpAckedLastAckLocations; - - // swap out the hash maps.. - synchronized (this) { - cpAckedLastAckLocations = this.ackedLastAckLocations; - this.ackedLastAckLocations = new HashMap(); - } - - return super.checkpoint(new Callback() { - @Override - public void execute() throws Exception { - - // Checkpoint the acknowledged messages. - Iterator iterator = cpAckedLastAckLocations.keySet().iterator(); - while (iterator.hasNext()) { - SubscriptionKey subscriptionKey = iterator.next(); - MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); - MessageAck ack = new MessageAck(); - ack.setMessageID(identity); - longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, - subscriptionKey.subscriptionName, identity, ack); - } - - } - }); - - } - - /** - * @return Returns the longTermStore. - */ - public TopicMessageStore getLongTermTopicMessageStore() { - return longTermStore; - } - - @Override - public void deleteSubscription(String clientId, String subscriptionName) throws IOException { - longTermStore.deleteSubscription(clientId, subscriptionName); - } - - @Override - public SubscriptionInfo[] getAllSubscriptions() throws IOException { - return longTermStore.getAllSubscriptions(); - } - - @Override - public int getMessageCount(String clientId, String subscriberName) throws IOException { - this.peristenceAdapter.checkpoint(true, true); - return longTermStore.getMessageCount(clientId, subscriberName); - } - - @Override - public long getMessageSize(String clientId, String subscriberName) throws IOException { - this.peristenceAdapter.checkpoint(true, true); - return longTermStore.getMessageSize(clientId, subscriberName); - } - - @Override - public void resetBatching(String clientId, String subscriptionName) { - longTermStore.resetBatching(clientId, subscriptionName); - } - - private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); - - @Override - public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { - return stats; - } -} diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java deleted file mode 100644 index 779bbc4595..0000000000 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java +++ /dev/null @@ -1,350 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store.journal; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; -import javax.transaction.xa.XAException; -import org.apache.activeio.journal.RecordLocation; -import org.apache.activemq.command.JournalTopicAck; -import org.apache.activemq.command.JournalTransaction; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.store.TransactionRecoveryListener; -import org.apache.activemq.store.TransactionStore; - -/** - */ -public class JournalTransactionStore implements TransactionStore { - - private final JournalPersistenceAdapter peristenceAdapter; - private final Map inflightTransactions = new LinkedHashMap(); - private final Map preparedTransactions = new LinkedHashMap(); - private boolean doingRecover; - - public static class TxOperation { - - static final byte ADD_OPERATION_TYPE = 0; - static final byte REMOVE_OPERATION_TYPE = 1; - static final byte ACK_OPERATION_TYPE = 3; - - public byte operationType; - public JournalMessageStore store; - public Object data; - - public TxOperation(byte operationType, JournalMessageStore store, Object data) { - this.operationType = operationType; - this.store = store; - this.data = data; - } - - } - - /** - * Operations - * - * - */ - public static class Tx { - - private final RecordLocation location; - private final ArrayList operations = new ArrayList(); - - public Tx(RecordLocation location) { - this.location = location; - } - - public void add(JournalMessageStore store, Message msg) { - operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg)); - } - - public void add(JournalMessageStore store, MessageAck ack) { - operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack)); - } - - public void add(JournalTopicMessageStore store, JournalTopicAck ack) { - operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack)); - } - - public Message[] getMessages() { - ArrayList list = new ArrayList(); - for (Iterator iter = operations.iterator(); iter.hasNext();) { - TxOperation op = iter.next(); - if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { - list.add(op.data); - } - } - Message rc[] = new Message[list.size()]; - list.toArray(rc); - return rc; - } - - public MessageAck[] getAcks() { - ArrayList list = new ArrayList(); - for (Iterator iter = operations.iterator(); iter.hasNext();) { - TxOperation op = iter.next(); - if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { - list.add(op.data); - } - } - MessageAck rc[] = new MessageAck[list.size()]; - list.toArray(rc); - return rc; - } - - public ArrayList getOperations() { - return operations; - } - - } - - public JournalTransactionStore(JournalPersistenceAdapter adapter) { - this.peristenceAdapter = adapter; - } - - /** - * @throws IOException - * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) - */ - public void prepare(TransactionId txid) throws IOException { - Tx tx = null; - synchronized (inflightTransactions) { - tx = inflightTransactions.remove(txid); - } - if (tx == null) { - return; - } - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), - true); - synchronized (preparedTransactions) { - preparedTransactions.put(txid, tx); - } - } - - /** - * @throws IOException - * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) - */ - public void replayPrepare(TransactionId txid) throws IOException { - Tx tx = null; - synchronized (inflightTransactions) { - tx = inflightTransactions.remove(txid); - } - if (tx == null) { - return; - } - synchronized (preparedTransactions) { - preparedTransactions.put(txid, tx); - } - } - - public Tx getTx(Object txid, RecordLocation location) { - Tx tx = null; - synchronized (inflightTransactions) { - tx = inflightTransactions.get(txid); - } - if (tx == null) { - tx = new Tx(location); - inflightTransactions.put(txid, tx); - } - return tx; - } - - /** - * @throws XAException - * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) - */ - public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { - Tx tx; - if (preCommit != null) { - preCommit.run(); - } - if (wasPrepared) { - synchronized (preparedTransactions) { - tx = preparedTransactions.remove(txid); - } - } else { - synchronized (inflightTransactions) { - tx = inflightTransactions.remove(txid); - } - } - if (tx == null) { - if (postCommit != null) { - postCommit.run(); - } - return; - } - if (txid.isXATransaction()) { - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, - wasPrepared), true); - } else { - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, - wasPrepared), true); - } - if (postCommit != null) { - postCommit.run(); - } - } - - /** - * @throws XAException - * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) - */ - public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException { - if (wasPrepared) { - synchronized (preparedTransactions) { - return preparedTransactions.remove(txid); - } - } else { - synchronized (inflightTransactions) { - return inflightTransactions.remove(txid); - } - } - } - - /** - * @throws IOException - * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) - */ - public void rollback(TransactionId txid) throws IOException { - Tx tx = null; - synchronized (inflightTransactions) { - tx = inflightTransactions.remove(txid); - } - if (tx != null) { - synchronized (preparedTransactions) { - tx = preparedTransactions.remove(txid); - } - } - if (tx != null) { - if (txid.isXATransaction()) { - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, - false), true); - } else { - peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, - txid, false), true); - } - } - } - - /** - * @throws IOException - * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) - */ - public void replayRollback(TransactionId txid) throws IOException { - boolean inflight = false; - synchronized (inflightTransactions) { - inflight = inflightTransactions.remove(txid) != null; - } - if (inflight) { - synchronized (preparedTransactions) { - preparedTransactions.remove(txid); - } - } - } - - public void start() throws Exception { - } - - public void stop() throws Exception { - } - - public synchronized void recover(TransactionRecoveryListener listener) throws IOException { - // All the in-flight transactions get rolled back.. - synchronized (inflightTransactions) { - inflightTransactions.clear(); - } - this.doingRecover = true; - try { - Map txs = null; - synchronized (preparedTransactions) { - txs = new LinkedHashMap(preparedTransactions); - } - for (Iterator iter = txs.keySet().iterator(); iter.hasNext();) { - Object txid = iter.next(); - Tx tx = txs.get(txid); - listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); - } - } finally { - this.doingRecover = false; - } - } - - /** - * @param message - * @throws IOException - */ - void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException { - Tx tx = getTx(message.getTransactionId(), location); - tx.add(store, message); - } - - /** - * @param ack - * @throws IOException - */ - public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) - throws IOException { - Tx tx = getTx(ack.getTransactionId(), location); - tx.add(store, ack); - } - - public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) { - Tx tx = getTx(ack.getTransactionId(), location); - tx.add(store, ack); - } - - public RecordLocation checkpoint() throws IOException { - // Nothing really to checkpoint.. since, we don't - // checkpoint tx operations in to long term store until they are - // committed. - // But we keep track of the first location of an operation - // that was associated with an active tx. The journal can not - // roll over active tx records. - RecordLocation rc = null; - synchronized (inflightTransactions) { - for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) { - Tx tx = iter.next(); - RecordLocation location = tx.location; - if (rc == null || rc.compareTo(location) < 0) { - rc = location; - } - } - } - synchronized (preparedTransactions) { - for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) { - Tx tx = iter.next(); - RecordLocation location = tx.location; - if (rc == null || rc.compareTo(location) < 0) { - rc = location; - } - } - return rc; - } - } - - public boolean isDoingRecover() { - return doingRecover; - } - -} diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/package.html b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/package.html deleted file mode 100755 index 380f9d7547..0000000000 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/package.html +++ /dev/null @@ -1,27 +0,0 @@ - - - - - - -

- Message persistence using a high performance transaction log via the Journal interface. -

- - - diff --git a/activemq-kahadb-store/pom.xml b/activemq-kahadb-store/pom.xml index 8b3a7495fe..2af75afee3 100644 --- a/activemq-kahadb-store/pom.xml +++ b/activemq-kahadb-store/pom.xml @@ -39,11 +39,6 @@ org.apache.activemq activemq-broker - - ${project.groupId} - activeio-core - true - org.apache.activemq.protobuf activemq-protobuf diff --git a/activemq-karaf/src/main/resources/features-core.xml b/activemq-karaf/src/main/resources/features-core.xml index dedc644e80..f8f9d1c71f 100644 --- a/activemq-karaf/src/main/resources/features-core.xml +++ b/activemq-karaf/src/main/resources/features-core.xml @@ -56,7 +56,6 @@ mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xpp3/${xpp3-bundle-version} mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xstream/${xstream-bundle-version} mvn:org.apache.aries/org.apache.aries.util/${aries-version} - mvn:org.apache.activemq/activeio-core/${activeio-version} mvn:org.codehaus.jettison/jettison/${jettison-version} mvn:com.fasterxml.jackson.core/jackson-core/${jackson-version} mvn:com.fasterxml.jackson.core/jackson-databind/${jackson-version} diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml index 1743b5d5b9..f9e9d0f2d0 100644 --- a/activemq-mqtt/pom.xml +++ b/activemq-mqtt/pom.xml @@ -40,11 +40,6 @@ activemq-broker - - ${project.groupId} - activeio-core - true - org.apache.activemq.protobuf activemq-protobuf diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml index dc3ee1039b..03d29cd7a0 100644 --- a/activemq-osgi/pom.xml +++ b/activemq-osgi/pom.xml @@ -174,7 +174,6 @@ javax.resource*;resolution:=optional, javax.servlet*;resolution:=optional, com.thoughtworks.xstream*;resolution:=optional, - org.apache.activeio*;resolution:=optional, org.apache.camel*;version="${camel-version-range}";resolution:=optional, org.apache.camel.spring.handler;version="${camel-version-range}";resolution:=optional, org.apache.camel.spring.xml.handler;version="${camel-version-range}";resolution:=optional, diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml index 4e786ecf1e..005f46a10d 100644 --- a/activemq-spring/pom.xml +++ b/activemq-spring/pom.xml @@ -69,11 +69,6 @@ activemq-kahadb-store true - - ${project.groupId} - activeio-core - true - diff --git a/activemq-spring/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java b/activemq-spring/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java deleted file mode 100644 index 0964024251..0000000000 --- a/activemq-spring/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store; - -import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory; -import org.springframework.beans.factory.FactoryBean; - -/** - * Creates a default persistence model using the Journal and JDBC - * - * @org.apache.xbean.XBean element="journaledJDBC" - * - * - */ -public class PersistenceAdapterFactoryBean extends JournalPersistenceAdapterFactory implements FactoryBean { - - private PersistenceAdapter persistenceAdaptor; - - public Object getObject() throws Exception { - if (persistenceAdaptor == null) { - persistenceAdaptor = createPersistenceAdapter(); - } - return persistenceAdaptor; - } - - public Class getObjectType() { - return PersistenceAdapter.class; - } - - public boolean isSingleton() { - return false; - } - -} diff --git a/activemq-tooling/activemq-maven-plugin/pom.xml b/activemq-tooling/activemq-maven-plugin/pom.xml index 47ed601154..793d0ab236 100644 --- a/activemq-tooling/activemq-maven-plugin/pom.xml +++ b/activemq-tooling/activemq-maven-plugin/pom.xml @@ -42,10 +42,6 @@ org.apache.activemq activemq-console - - org.apache.activemq - activeio-core - org.apache.derby derby diff --git a/activemq-tooling/activemq-memtest-maven-plugin/pom.xml b/activemq-tooling/activemq-memtest-maven-plugin/pom.xml index 86efdb9d46..b00d48a2ff 100644 --- a/activemq-tooling/activemq-memtest-maven-plugin/pom.xml +++ b/activemq-tooling/activemq-memtest-maven-plugin/pom.xml @@ -43,10 +43,6 @@ org.apache.activemq activemq-console - - org.apache.activemq - activeio-core - org.apache.derby derby diff --git a/activemq-tooling/activemq-perf-maven-plugin/pom.xml b/activemq-tooling/activemq-perf-maven-plugin/pom.xml index e0afe0ffa2..e366c14362 100644 --- a/activemq-tooling/activemq-perf-maven-plugin/pom.xml +++ b/activemq-tooling/activemq-perf-maven-plugin/pom.xml @@ -46,10 +46,6 @@ org.apache.activemq activemq-console - - org.apache.activemq - activeio-core - org.apache.derby derby diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index 785818fc42..3b88e83af8 100644 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -74,11 +74,6 @@ org.slf4j slf4j-api - - ${project.groupId} - activeio-core - true - org.fusesource.mqtt-client mqtt-client diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java index a62ce071a5..4f5078c521 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java @@ -50,7 +50,6 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.jdbc.DefaultDatabaseLocker; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.adapter.TransactDatabaseLocker; -import org.apache.activemq.store.journal.JournalPersistenceAdapter; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.transport.tcp.TcpTransportServer; import org.apache.activemq.usage.SystemUsage; @@ -87,43 +86,6 @@ public class ConfigTest { * for succeeding creation. It uses the first created directory as the root. */ - /* - * This tests creating a journal persistence adapter using the persistence - * adapter factory bean - */ - @Test - public void testJournaledJDBCConfig() throws Exception { - - File journalFile = new File(JOURNAL_ROOT + "testJournaledJDBCConfig/journal"); - recursiveDelete(journalFile); - - File derbyFile = new File(DERBY_ROOT + "testJournaledJDBCConfig/derbydb"); // Default - recursiveDelete(derbyFile); - - BrokerService broker; - broker = createBroker(new FileSystemResource(CONF_ROOT + "journaledjdbc-example.xml")); - try { - assertEquals("Broker Config Error (brokerName)", "brokerJournaledJDBCConfigTest", broker.getBrokerName()); - - PersistenceAdapter adapter = broker.getPersistenceAdapter(); - - assertTrue("Should have created a journal persistence adapter", adapter instanceof JournalPersistenceAdapter); - assertTrue("Should have created a derby directory at " + derbyFile.getAbsolutePath(), derbyFile.exists()); - assertTrue("Should have created a journal directory at " + journalFile.getAbsolutePath(), journalFile.exists()); - - // Check persistence factory configurations - broker.getPersistenceAdapter(); - - assertTrue(broker.getSystemUsage().getStoreUsage().getStore() instanceof JournalPersistenceAdapter); - - LOG.info("Success"); - } finally { - if (broker != null) { - broker.stop(); - } - } - } - @Test public void testJdbcLockConfigOverride() throws Exception { @@ -291,32 +253,6 @@ public class ConfigTest { } } - /* - * This tests creating a journal persistence adapter using xbeans-spring - */ - @Test - public void testJournalConfig() throws Exception { - File journalFile = new File(JOURNAL_ROOT + "testJournalConfig/journal"); - recursiveDelete(journalFile); - - BrokerService broker; - broker = createBroker(new FileSystemResource(CONF_ROOT + "journal-example.xml")); - try { - assertEquals("Broker Config Error (brokerName)", "brokerJournalConfigTest", broker.getBrokerName()); - - PersistenceAdapter adapter = broker.getPersistenceAdapter(); - - assertTrue("Should have created a journal persistence adapter", adapter instanceof JournalPersistenceAdapter); - assertTrue("Should have created a journal directory at " + journalFile.getAbsolutePath(), journalFile.exists()); - - LOG.info("Success"); - } finally { - if (broker != null) { - broker.stop(); - } - } - } - /* * This tests creating a memory persistence adapter using xbeans-spring */ diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java index 9ab45b99bd..602c0f5050 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java @@ -61,13 +61,6 @@ public class InactiveDurableTopicTest extends TestCase { broker = new BrokerService(); //broker.setPersistenceAdapter(new KahaPersistenceAdapter()); - /* - * JournalPersistenceAdapterFactory factory = new - * JournalPersistenceAdapterFactory(); - * factory.setDataDirectoryFile(broker.getDataDirectory()); - * factory.setTaskRunnerFactory(broker.getTaskRunnerFactory()); - * factory.setUseJournal(false); broker.setPersistenceFactory(factory); - */ broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); broker.start(); connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveQueueTest.java index 866926233d..11ef88a99f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveQueueTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveQueueTest.java @@ -58,13 +58,7 @@ public class InactiveQueueTest extends TestCase { // broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File // ("TEST_STUFD"))); - /* - * JournalPersistenceAdapterFactory factory = new - * JournalPersistenceAdapterFactory(); - * factory.setDataDirectoryFile(broker.getDataDirectory()); - * factory.setTaskRunnerFactory(broker.getTaskRunnerFactory()); - * factory.setUseJournal(false); broker.setPersistenceFactory(factory); - */ + broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); broker.start(); connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JournalDurableSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JournalDurableSubscriptionTest.java deleted file mode 100644 index 8634be320e..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JournalDurableSubscriptionTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import java.io.File; -import java.io.IOException; - -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory; - -/** - * - */ -public class JournalDurableSubscriptionTest extends DurableSubscriptionTestSupport { - - protected PersistenceAdapter createPersistenceAdapter() throws IOException { - File dataDir = new File("target/test-data/durableJournal"); - JournalPersistenceAdapterFactory factory = new JournalPersistenceAdapterFactory(); - factory.setDataDirectoryFile(dataDir); - factory.setUseJournal(true); - factory.setJournalLogFileSize(1024 * 64); - return factory.createPersistenceAdapter(); - } -} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/JDBCPersistenceXBeanConfigTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/JDBCPersistenceXBeanConfigTest.java deleted file mode 100644 index 5a34932e12..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/JDBCPersistenceXBeanConfigTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.xbean; - -import java.net.URI; - -import junit.framework.TestCase; - -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; - -/** - * - */ -public class JDBCPersistenceXBeanConfigTest extends TestCase { - - protected BrokerService brokerService; - - public void testConfiguredCorrectly() throws Exception { - - PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter(); - assertNotNull(persistenceAdapter); - assertTrue(persistenceAdapter instanceof JDBCPersistenceAdapter); - - JDBCPersistenceAdapter jpa = (JDBCPersistenceAdapter)persistenceAdapter; - assertEquals("BROKER1.", jpa.getStatements().getTablePrefix()); - - } - - protected void setUp() throws Exception { - brokerService = createBroker(); - brokerService.start(); - } - - protected void tearDown() throws Exception { - if (brokerService != null) { - brokerService.stop(); - } - } - - protected BrokerService createBroker() throws Exception { - String uri = "org/apache/activemq/xbean/jdbc-persistence-test.xml"; - return BrokerFactory.createBroker(new URI("xbean:" + uri)); - } - -} diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/store/loadtester.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/store/loadtester.xml index 6383e84f1b..37124a9812 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/store/loadtester.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/store/loadtester.xml @@ -32,10 +32,13 @@ - - - + + + @@ -43,19 +46,5 @@ - - - diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/config/sample-conf/journal-example.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/config/sample-conf/journal-example.xml deleted file mode 100644 index 2a79308dcf..0000000000 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/config/sample-conf/journal-example.xml +++ /dev/null @@ -1,60 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - target/test-data/testJournalConfig/journal - - - - - - diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/config/sample-conf/journaledjdbc-example.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/config/sample-conf/journaledjdbc-example.xml deleted file mode 100644 index 4a6a0db90c..0000000000 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/config/sample-conf/journaledjdbc-example.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - - - - - - - - - - - - - diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/console/command/activemq.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/console/command/activemq.xml index b2aaf16a09..de1220d6e5 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/console/command/activemq.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/console/command/activemq.xml @@ -26,13 +26,17 @@ - + - - - + + + - + diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/usecases/activemq.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/usecases/activemq.xml index b4954975b3..3e4af155e2 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/usecases/activemq.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/usecases/activemq.xml @@ -22,9 +22,9 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> - + - + @@ -55,13 +55,13 @@ --> - - - - - + + + @@ -69,40 +69,5 @@ - - - - - - - - - - - diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/activemq.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/activemq.xml index aadfa15dd2..a83ece2485 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/activemq.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/activemq.xml @@ -27,7 +27,7 @@ - + - - - - - - + + + @@ -51,15 +50,5 @@ - - diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/activemq2.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/activemq2.xml index 78d19959f6..652968a55c 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/activemq2.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/activemq2.xml @@ -26,7 +26,7 @@ - + @@ -43,14 +43,17 @@ - - - - + + + - \ No newline at end of file + diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/jdbc-persistence-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/jdbc-persistence-test.xml deleted file mode 100644 index d00a8b980b..0000000000 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/jdbc-persistence-test.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - diff --git a/activemq-web-demo/pom.xml b/activemq-web-demo/pom.xml index 075d7fc173..1985afa609 100644 --- a/activemq-web-demo/pom.xml +++ b/activemq-web-demo/pom.xml @@ -171,16 +171,6 @@ test-jar test - - ${project.groupId} - activeio-core - - - javax.servlet - servlet-api - - - org.apache.activemq activemq-jaas diff --git a/assembly/pom.xml b/assembly/pom.xml index 067c1b245a..a5d2b0cdb8 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -37,10 +37,6 @@ ${project.groupId} activemq-all - - ${project.groupId} - activeio-core - ${project.groupId} activemq-client diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml index 83dfba0f1d..f77fa1c7d2 100644 --- a/assembly/src/main/descriptors/common-bin.xml +++ b/assembly/src/main/descriptors/common-bin.xml @@ -184,7 +184,6 @@ ${pom.groupId}:activemq-pool ${pom.groupId}:activemq-partition ${pom.groupId}:activemq-shiro - ${pom.groupId}:activeio-core commons-beanutils:commons-beanutils commons-collections:commons-collections commons-io:commons-io diff --git a/assembly/src/release/examples/conf/activemq-demo.xml b/assembly/src/release/examples/conf/activemq-demo.xml index 13524dbb64..8d050d2181 100644 --- a/assembly/src/release/examples/conf/activemq-demo.xml +++ b/assembly/src/release/examples/conf/activemq-demo.xml @@ -134,15 +134,6 @@ - - 2023-03-28T14:10:59Z - 3.1.4 activemq-${project.version} Apache ActiveMQ @@ -415,18 +414,6 @@ ${project.version} war - - org.apache.activemq - activeio-core - ${activeio-version} - - - org.apache.activemq - activeio-core - ${activeio-version} - test-jar - test - org.apache.activemq activemq-openwire-generator