From ba477931a7ee6d430e0d597a45f83ceffc5282dc Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 25 Sep 2009 16:37:44 +0000 Subject: [PATCH] AMQ-2415: Move the activemq-jpa-store module into the sandbox git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@818914 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-all/pom.xml | 5 - activemq-jpa-store/pom.xml | 144 --------- .../activemq/store/jpa/JPAMessageStore.java | 189 ------------ .../store/jpa/JPAPersistenceAdapter.java | 278 ------------------ .../activemq/store/jpa/JPAReferenceStore.java | 212 ------------- .../store/jpa/JPAReferenceStoreAdapter.java | 168 ----------- .../store/jpa/JPATopicMessageStore.java | 252 ---------------- .../store/jpa/JPATopicReferenceStore.java | 259 ---------------- .../store/jpa/model/StoredMessage.java | 92 ------ .../jpa/model/StoredMessageReference.java | 104 ------- .../store/jpa/model/StoredSubscription.java | 167 ----------- .../src/main/resources/META-INF/LICENSE | 203 ------------- .../src/main/resources/META-INF/NOTICE | 12 - .../main/resources/META-INF/persistence.xml | 26 -- .../broker/store/JPARecoveryBrokerTest.java | 70 ----- .../broker/store/JPAStoreLoadTester.java | 46 --- .../broker/store/QuickJPAStoreLoadTester.java | 46 --- .../QuickJPAStoreRecoveryBrokerTest.java | 76 ----- .../QuickJPAStoreXARecoveryBrokerTest.java | 76 ----- .../src/test/resources/log4j.properties | 35 --- .../activemq/broker/store/jpabroker.xml | 44 --- .../activemq/broker/store/quickjpabroker.xml | 48 --- pom.xml | 6 - 23 files changed, 2558 deletions(-) delete mode 100755 activemq-jpa-store/pom.xml delete mode 100644 activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java delete mode 100755 activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java delete mode 100644 activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java delete mode 100644 activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java delete mode 100644 activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java delete mode 100644 activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java delete mode 100644 activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java delete mode 100644 activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java delete mode 100644 activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java delete mode 100755 activemq-jpa-store/src/main/resources/META-INF/LICENSE delete mode 100644 activemq-jpa-store/src/main/resources/META-INF/NOTICE delete mode 100644 activemq-jpa-store/src/main/resources/META-INF/persistence.xml delete mode 100644 activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java delete mode 100644 activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java delete mode 100644 activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java delete mode 100644 activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java delete mode 100644 activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java delete mode 100644 activemq-jpa-store/src/test/resources/log4j.properties delete mode 100644 activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml delete mode 100644 activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml diff --git a/activemq-all/pom.xml b/activemq-all/pom.xml index a8ee5d79b9..dc98ef55b2 100644 --- a/activemq-all/pom.xml +++ b/activemq-all/pom.xml @@ -45,10 +45,6 @@ ${pom.groupId} activemq-optional - - ${pom.groupId} - activemq-jpa-store - ${pom.groupId} kahadb @@ -78,7 +74,6 @@ ${project.groupId}:activemq-console ${project.groupId}:activemq-jaas ${project.groupId}:activemq-optional - ${project.groupId}:activemq-jpa-store ${project.groupId}:kahadb org.apache.geronimo.specs:geronimo-jms_1.1_spec org.apache.geronimo.specs:geronimo-jta_1.0.1B_spec diff --git a/activemq-jpa-store/pom.xml b/activemq-jpa-store/pom.xml deleted file mode 100755 index a80223ba8f..0000000000 --- a/activemq-jpa-store/pom.xml +++ /dev/null @@ -1,144 +0,0 @@ - - - - - 4.0.0 - - - org.apache.activemq - activemq-parent - 5.4-SNAPSHOT - - - activemq-jpa-store - jar - ActiveMQ :: JPA Message Store - - - - - ${pom.groupId} - activemq-core - - - ${pom.groupId} - activemq-core - test-jar - - - - org.apache.openjpa - openjpa-persistence-jdbc - - - - org.apache.derby - derby - true - - - - commons-collections - commons-collections - test - - - commons-primitives - commons-primitives - test - - - commons-pool - commons-pool - true - - - org.springframework - spring-beans - true - - - org.apache.xbean - xbean-spring - true - - - - - - - - - - - - org.apache.maven.plugins - maven-antrun-plugin - - - - process-classes - - - - - - - - - - - - - - - - - - - - - - - run - - - - - - - - diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java deleted file mode 100644 index 06fa2e9628..0000000000 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAMessageStore.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jpa; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.AbstractMessageStore; -import org.apache.activemq.store.jpa.model.StoredMessage; -import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.wireformat.WireFormat; - -public class JPAMessageStore extends AbstractMessageStore { - - protected final JPAPersistenceAdapter adapter; - protected final WireFormat wireFormat; - protected final String destinationName; - protected AtomicLong lastMessageId = new AtomicLong(-1); - - public JPAMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { - super(destination); - this.adapter = adapter; - this.destinationName = destination.getQualifiedName(); - this.wireFormat = this.adapter.getWireFormat(); - } - - public void addMessage(ConnectionContext context, Message message) throws IOException { - - EntityManager manager = adapter.beginEntityManager(context); - try { - - ByteSequence sequence = wireFormat.marshal(message); - sequence.compact(); - - StoredMessage sm = new StoredMessage(); - sm.setDestination(destinationName); - sm.setId(message.getMessageId().getBrokerSequenceId()); - sm.setMessageId(message.getMessageId().toString()); - sm.setExiration(message.getExpiration()); - sm.setData(sequence.data); - - manager.persist(sm); - - } catch (Throwable e) { - adapter.rollbackEntityManager(context, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(context, manager); - } - - public Message getMessage(MessageId identity) throws IOException { - Message rc; - EntityManager manager = adapter.beginEntityManager(null); - try { - StoredMessage message = null; - if (identity.getBrokerSequenceId() != 0) { - message = manager.find(StoredMessage.class, identity.getBrokerSequenceId()); - } else { - Query query = manager.createQuery("select m from StoredMessage m where m.messageId=?1"); - query.setParameter(1, identity.toString()); - message = (StoredMessage)query.getSingleResult(); - } - - rc = (Message)wireFormat.unmarshal(new ByteSequence(message.getData())); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc; - } - - public int getMessageCount() throws IOException { - Long rc; - EntityManager manager = adapter.beginEntityManager(null); - try { - Query query = manager.createQuery("select count(m) from StoredMessage m"); - rc = (Long)query.getSingleResult(); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc.intValue(); - } - - @SuppressWarnings("unchecked") - public void recover(MessageRecoveryListener container) throws Exception { - EntityManager manager = adapter.beginEntityManager(null); - try { - Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 order by m.id asc"); - query.setParameter(1, destinationName); - for (StoredMessage m : (List)query.getResultList()) { - Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData())); - container.recoverMessage(message); - } - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - @SuppressWarnings("unchecked") - public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { - - EntityManager manager = adapter.beginEntityManager(null); - try { - - Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc"); - query.setParameter(1, destinationName); - query.setParameter(2, lastMessageId.get()); - query.setMaxResults(maxReturned); - int count = 0; - for (StoredMessage m : (List)query.getResultList()) { - Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData())); - listener.recoverMessage(message); - lastMessageId.set(m.getId()); - count++; - if (count >= maxReturned) { - return; - } - } - - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - public void removeAllMessages(ConnectionContext context) throws IOException { - EntityManager manager = adapter.beginEntityManager(context); - try { - Query query = manager.createQuery("delete from StoredMessage m where m.destination=?1"); - query.setParameter(1, destinationName); - query.executeUpdate(); - } catch (Throwable e) { - adapter.rollbackEntityManager(context, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(context, manager); - } - - public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - EntityManager manager = adapter.beginEntityManager(context); - try { - Query query = manager.createQuery("delete from StoredMessage m where m.id=?1"); - query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId()); - query.executeUpdate(); - } catch (Throwable e) { - adapter.rollbackEntityManager(context, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(context, manager); - } - - public void resetBatching() { - lastMessageId.set(-1); - } -} diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java deleted file mode 100755 index 066bc32e9c..0000000000 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jpa; - -import java.io.File; -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import javax.persistence.EntityManager; -import javax.persistence.EntityManagerFactory; -import javax.persistence.Persistence; -import javax.persistence.Query; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.openwire.OpenWireFormatFactory; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TransactionStore; -import org.apache.activemq.store.memory.MemoryTransactionStore; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.wireformat.WireFormat; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * An implementation of {@link PersistenceAdapter} that uses JPA to store it's - * messages. - * - * @org.apache.xbean.XBean element="jpaPersistenceAdapter" - * @version $Revision: 1.17 $ - */ -public class JPAPersistenceAdapter implements PersistenceAdapter { - - String entityManagerName = "activemq"; - Properties entityManagerProperties = System.getProperties(); - EntityManagerFactory entityManagerFactory; - private WireFormat wireFormat; - private MemoryTransactionStore transactionStore; - - public void beginTransaction(ConnectionContext context) throws IOException { - if (context.getLongTermStoreContext() != null) { - throw new IOException("Transation already started."); - } - EntityManager manager = getEntityManagerFactory().createEntityManager(); - manager.getTransaction().begin(); - context.setLongTermStoreContext(manager); - } - - public void commitTransaction(ConnectionContext context) throws IOException { - EntityManager manager = (EntityManager)context.getLongTermStoreContext(); - if (manager == null) { - throw new IOException("Transation not started."); - } - context.setLongTermStoreContext(null); - manager.getTransaction().commit(); - manager.close(); - } - - public void rollbackTransaction(ConnectionContext context) throws IOException { - EntityManager manager = (EntityManager)context.getLongTermStoreContext(); - if (manager == null) { - throw new IOException("Transation not started."); - } - context.setLongTermStoreContext(null); - manager.getTransaction().rollback(); - manager.close(); - } - - public EntityManager beginEntityManager(ConnectionContext context) { - if (context == null || context.getLongTermStoreContext() == null) { - EntityManager manager = getEntityManagerFactory().createEntityManager(); - manager.getTransaction().begin(); - return manager; - } else { - return (EntityManager)context.getLongTermStoreContext(); - } - } - - public void commitEntityManager(ConnectionContext context, EntityManager manager) { - if (context == null || context.getLongTermStoreContext() == null) { - manager.getTransaction().commit(); - manager.close(); - } - } - - public void rollbackEntityManager(ConnectionContext context, EntityManager manager) { - if (context == null || context.getLongTermStoreContext() == null) { - manager.getTransaction().rollback(); - manager.close(); - } - } - - public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - MessageStore rc = new JPAMessageStore(this, destination); - if (transactionStore != null) { - rc = transactionStore.proxy(rc); - } - return rc; - } - - public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { - TopicMessageStore rc = new JPATopicMessageStore(this, destination); - if (transactionStore != null) { - rc = transactionStore.proxy(rc); - } - return rc; - } - - /** - * Cleanup method to remove any state associated with the given destination - * - * @param destination Destination to forget - */ - public void removeQueueMessageStore(ActiveMQQueue destination) { - } - - /** - * Cleanup method to remove any state associated with the given destination - * - * @param destination Destination to forget - */ - public void removeTopicMessageStore(ActiveMQTopic destination) { - } - - public TransactionStore createTransactionStore() throws IOException { - if (transactionStore == null) { - transactionStore = new MemoryTransactionStore(this); - } - return this.transactionStore; - } - - public void deleteAllMessages() throws IOException { - EntityManager manager = beginEntityManager(null); - try { - Query query = manager.createQuery("delete from StoredMessage m"); - query.executeUpdate(); - query = manager.createQuery("delete from StoredSubscription ss"); - query.executeUpdate(); - } catch (Throwable e) { - rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - commitEntityManager(null, manager); - } - - public Set getDestinations() { - HashSet rc = new HashSet(); - - EntityManager manager = beginEntityManager(null); - try { - Query query = manager.createQuery("select distinct m.destination from StoredMessage m"); - for (String dest : (List)query.getResultList()) { - rc.add(ActiveMQDestination.createDestination(dest, ActiveMQDestination.QUEUE_TYPE)); - } - } catch (RuntimeException e) { - rollbackEntityManager(null, manager); - throw e; - } - commitEntityManager(null, manager); - return rc; - } - - public long getLastMessageBrokerSequenceId() throws IOException { - long rc = 0; - EntityManager manager = beginEntityManager(null); - try { - Query query = manager.createQuery("select max(m.id) from StoredMessage m"); - Long t = (Long)query.getSingleResult(); - if (t != null) { - rc = t; - } - } catch (Throwable e) { - rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - commitEntityManager(null, manager); - return rc; - } - - public boolean isUseExternalMessageReferences() { - return false; - } - - public void setUsageManager(SystemUsage usageManager) { - } - - public void start() throws Exception { - } - - public void stop() throws Exception { - if (entityManagerFactory != null && entityManagerFactory.isOpen()) { - entityManagerFactory.close(); - } - } - - public EntityManagerFactory getEntityManagerFactory() { - if (entityManagerFactory == null) { - entityManagerFactory = createEntityManagerFactory(); - } - return entityManagerFactory; - } - - protected EntityManagerFactory createEntityManagerFactory() { - return Persistence.createEntityManagerFactory(getEntityManagerName(), getEntityManagerProperties()); - } - - public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) { - this.entityManagerFactory = entityManagerFactory; - } - - public Properties getEntityManagerProperties() { - return entityManagerProperties; - } - - public void setEntityManagerProperties(Properties entityManagerProperties) { - this.entityManagerProperties = entityManagerProperties; - } - - public String getEntityManagerName() { - return entityManagerName; - } - - public void setEntityManagerName(String entityManager) { - this.entityManagerName = entityManager; - } - - public WireFormat getWireFormat() { - if (wireFormat == null) { - wireFormat = createWireFormat(); - } - return wireFormat; - } - - private WireFormat createWireFormat() { - OpenWireFormatFactory wff = new OpenWireFormatFactory(); - return wff.createWireFormat(); - } - - public void setWireFormat(WireFormat wireFormat) { - this.wireFormat = wireFormat; - } - - public void checkpoint(boolean sync) throws IOException { - } - - public void setBrokerName(String brokerName) { - } - - public void setDirectory(File dir) { - } - - public long size(){ - return 0; - } - -} diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java deleted file mode 100644 index 19dd4fdeeb..0000000000 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStore.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jpa; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.ReferenceStore; -import org.apache.activemq.store.AbstractMessageStore; -import org.apache.activemq.store.jpa.model.StoredMessageReference; -import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.wireformat.WireFormat; - -public class JPAReferenceStore extends AbstractMessageStore implements ReferenceStore { - - protected final JPAPersistenceAdapter adapter; - protected final WireFormat wireFormat; - protected final String destinationName; - protected AtomicLong lastMessageId = new AtomicLong(-1); - protected final Lock lock = new ReentrantLock(); - - public JPAReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { - super(destination); - this.adapter = adapter; - this.destinationName = destination.getQualifiedName(); - this.wireFormat = this.adapter.getWireFormat(); - } - - public Lock getStoreLock() { - return lock; - } - - public void addMessage(ConnectionContext context, Message message) throws IOException { - throw new RuntimeException("Use addMessageReference instead"); - } - - public Message getMessage(MessageId identity) throws IOException { - throw new RuntimeException("Use addMessageReference instead"); - } - - public boolean addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException { - EntityManager manager = adapter.beginEntityManager(context); - try { - - StoredMessageReference sm = new StoredMessageReference(); - sm.setDestination(destinationName); - sm.setId(messageId.getBrokerSequenceId()); - sm.setMessageId(messageId.toString()); - sm.setExiration(data.getExpiration()); - sm.setFileId(data.getFileId()); - sm.setOffset(data.getOffset()); - - manager.persist(sm); - - } catch (Throwable e) { - adapter.rollbackEntityManager(context, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(context, manager); - return true; - } - - public ReferenceData getMessageReference(MessageId identity) throws IOException { - ReferenceData rc = null; - EntityManager manager = adapter.beginEntityManager(null); - try { - StoredMessageReference message = null; - if (identity.getBrokerSequenceId() != 0) { - message = manager.find(StoredMessageReference.class, identity.getBrokerSequenceId()); - } else { - Query query = manager.createQuery("select m from StoredMessageReference m where m.messageId=?1"); - query.setParameter(1, identity.toString()); - message = (StoredMessageReference)query.getSingleResult(); - } - if (message != null) { - rc = new ReferenceData(); - rc.setExpiration(message.getExiration()); - rc.setFileId(message.getFileId()); - rc.setOffset(message.getOffset()); - } - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc; - } - - public int getMessageCount() throws IOException { - Long rc; - EntityManager manager = adapter.beginEntityManager(null); - try { - Query query = manager.createQuery("select count(m) from StoredMessageReference m"); - rc = (Long)query.getSingleResult(); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc.intValue(); - } - - public void recover(MessageRecoveryListener container) throws Exception { - EntityManager manager = adapter.beginEntityManager(null); - try { - Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 order by m.id asc"); - query.setParameter(1, destinationName); - for (StoredMessageReference m : (List)query.getResultList()) { - MessageId id = new MessageId(m.getMessageId()); - id.setBrokerSequenceId(m.getId()); - container.recoverMessageReference(id); - } - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { - - EntityManager manager = adapter.beginEntityManager(null); - try { - - Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc"); - query.setParameter(1, destinationName); - query.setParameter(2, lastMessageId.get()); - query.setMaxResults(maxReturned); - int count = 0; - for (StoredMessageReference m : (List)query.getResultList()) { - MessageId id = new MessageId(m.getMessageId()); - id.setBrokerSequenceId(m.getId()); - listener.recoverMessageReference(id); - lastMessageId.set(m.getId()); - count++; - if (count >= maxReturned) { - return; - } - } - - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - public void removeAllMessages(ConnectionContext context) throws IOException { - EntityManager manager = adapter.beginEntityManager(context); - try { - Query query = manager.createQuery("delete from StoredMessageReference m where m.destination=?1"); - query.setParameter(1, destinationName); - query.executeUpdate(); - } catch (Throwable e) { - adapter.rollbackEntityManager(context, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(context, manager); - } - - public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - EntityManager manager = adapter.beginEntityManager(context); - try { - Query query = manager.createQuery("delete from StoredMessageReference m where m.id=?1"); - query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId()); - query.executeUpdate(); - } catch (Throwable e) { - adapter.rollbackEntityManager(context, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(context, manager); - } - - public void resetBatching() { - lastMessageId.set(-1); - } - - public void setBatch(MessageId startAfter) { - } - - public boolean supportsExternalBatchControl() { - return false; - } -} diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java deleted file mode 100644 index 36357421fe..0000000000 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAReferenceStoreAdapter.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jpa; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.SubscriptionInfo; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.ReferenceStore; -import org.apache.activemq.store.ReferenceStoreAdapter; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TopicReferenceStore; -import org.apache.activemq.store.amq.AMQTx; -import org.apache.activemq.util.IOExceptionSupport; - -/** - * An implementation of {@link ReferenceStoreAdapter} that uses JPA to store - * it's message references. - * - * @org.apache.xbean.XBean element="jpaReferenceStoreAdapter" - * @version $Revision: 1.17 $ - */ -public class JPAReferenceStoreAdapter extends JPAPersistenceAdapter implements ReferenceStoreAdapter { - - @Override - public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - throw new RuntimeException("Use createQueueReferenceStore instead."); - } - - @Override - public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { - throw new RuntimeException("Use createTopicReferenceStore instead."); - } - - public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException { - JPAReferenceStore rc = new JPAReferenceStore(this, destination); - return rc; - } - - public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException { - JPATopicReferenceStore rc = new JPATopicReferenceStore(this, destination); - return rc; - } - - public void deleteAllMessages() throws IOException { - EntityManager manager = beginEntityManager(null); - try { - Query query = manager.createQuery("delete from StoredMessageReference m"); - query.executeUpdate(); - query = manager.createQuery("delete from StoredSubscription ss"); - query.executeUpdate(); - } catch (Throwable e) { - rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - commitEntityManager(null, manager); - } - - public Set getDestinations() { - HashSet rc = new HashSet(); - - EntityManager manager = beginEntityManager(null); - try { - Query query = manager.createQuery("select distinct m.destination from StoredMessageReference m"); - for (String dest : (List)query.getResultList()) { - rc.add(ActiveMQDestination.createDestination(dest, ActiveMQDestination.QUEUE_TYPE)); - } - } catch (RuntimeException e) { - rollbackEntityManager(null, manager); - throw e; - } - commitEntityManager(null, manager); - return rc; - } - - public long getLastMessageBrokerSequenceId() throws IOException { - long rc = 0; - EntityManager manager = beginEntityManager(null); - try { - Query query = manager.createQuery("select max(m.id) from StoredMessageReference m"); - Long t = (Long)query.getSingleResult(); - if (t != null) { - rc = t; - } - } catch (Throwable e) { - rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - commitEntityManager(null, manager); - return rc; - } - - public Set getReferenceFileIdsInUse() throws IOException { - HashSet rc = null; - EntityManager manager = beginEntityManager(null); - try { - Query query = manager.createQuery("select distinct m.fileId from StoredMessageReference m"); - rc = new HashSet((List)query.getResultList()); - } catch (Throwable e) { - rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - commitEntityManager(null, manager); - return rc; - } - - /** - * @return - * @see org.apache.activemq.store.ReferenceStoreAdapter#isStoreValid() - */ - public boolean isStoreValid() { - return false; - } - - /** - * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages() - */ - public void clearMessages() { - } - - /** - * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState() - */ - public void recoverState() { - } - - public Map retrievePreparedState() throws IOException { - return null; - } - - public void savePreparedState(Map map) throws IOException { - } - - public long getMaxDataFileLength() { - return 0; - } - - public void setMaxDataFileLength(long maxDataFileLength) { - } - - public void recoverSubscription(SubscriptionInfo info) throws IOException { - } -} diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java deleted file mode 100644 index 04fa484a99..0000000000 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jpa; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.SubscriptionInfo; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.jpa.model.StoredMessage; -import org.apache.activemq.store.jpa.model.StoredSubscription; -import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.IOExceptionSupport; - -public class JPATopicMessageStore extends JPAMessageStore implements TopicMessageStore { - private Map subscriberLastMessageMap = new ConcurrentHashMap(); - - public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { - super(adapter, destination); - } - - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { - EntityManager manager = adapter.beginEntityManager(context); - try { - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - ss.setLastAckedId(messageId.getBrokerSequenceId()); - } catch (Throwable e) { - adapter.rollbackEntityManager(context, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(context, manager); - } - - public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { - EntityManager manager = adapter.beginEntityManager(null); - try { - StoredSubscription ss = new StoredSubscription(); - ss.setClientId(info.getClientId()); - ss.setSubscriptionName(info.getSubscriptionName()); - ss.setDestination(destinationName); - ss.setSelector(info.getSelector()); - ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName()); - ss.setLastAckedId(-1); - - if (!retroactive) { - Query query = manager.createQuery("select max(m.id) from StoredMessage m"); - Long rc = (Long)query.getSingleResult(); - if (rc != null) { - ss.setLastAckedId(rc); - } - } - - manager.persist(ss); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - public void deleteSubscription(String clientId, String subscriptionName) throws IOException { - EntityManager manager = adapter.beginEntityManager(null); - try { - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - manager.remove(ss); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) { - Query query = manager.createQuery("select ss from StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 " + "and ss.destination=?3"); - query.setParameter(1, clientId); - query.setParameter(2, subscriptionName); - query.setParameter(3, destinationName); - List resultList = query.getResultList(); - if (resultList.isEmpty()) { - return null; - } - return resultList.get(0); - } - - public SubscriptionInfo[] getAllSubscriptions() throws IOException { - SubscriptionInfo rc[]; - EntityManager manager = adapter.beginEntityManager(null); - try { - ArrayList l = new ArrayList(); - - Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1"); - query.setParameter(1, destinationName); - for (StoredSubscription ss : (List)query.getResultList()) { - SubscriptionInfo info = new SubscriptionInfo(); - info.setClientId(ss.getClientId()); - info.setDestination(destination); - info.setSelector(ss.getSelector()); - info.setSubscriptionName(ss.getSubscriptionName()); - info.setSubscribedDestination(toSubscribedDestination(ss)); - l.add(info); - } - - rc = new SubscriptionInfo[l.size()]; - l.toArray(rc); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc; - } - - public int getMessageCount(String clientId, String subscriptionName) throws IOException { - Long rc; - EntityManager manager = adapter.beginEntityManager(null); - try { - Query query = manager.createQuery("select count(m) FROM StoredMessage m, StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 " + "and ss.destination=?3 " - + "and m.destination=ss.destination and m.id > ss.lastAckedId"); - query.setParameter(1, clientId); - query.setParameter(2, subscriptionName); - query.setParameter(3, destinationName); - rc = (Long)query.getSingleResult(); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc.intValue(); - } - - public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { - SubscriptionInfo rc = null; - EntityManager manager = adapter.beginEntityManager(null); - try { - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - if (ss != null) { - rc = new SubscriptionInfo(); - rc.setClientId(ss.getClientId()); - rc.setDestination(destination); - rc.setSelector(ss.getSelector()); - rc.setSubscriptionName(ss.getSubscriptionName()); - rc.setSubscribedDestination(toSubscribedDestination(ss)); - } - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc; - } - - private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) { - if (ss.getSubscribedDestination() == null) { - return null; - } - return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE); - } - - public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { - EntityManager manager = adapter.beginEntityManager(null); - try { - SubscriptionId id = new SubscriptionId(); - id.setClientId(clientId); - id.setSubscriptionName(subscriptionName); - id.setDestination(destinationName); - - AtomicLong last = subscriberLastMessageMap.get(id); - if (last == null) { - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - last = new AtomicLong(ss.getLastAckedId()); - subscriberLastMessageMap.put(id, last); - } - final AtomicLong lastMessageId = last; - - Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc"); - query.setParameter(1, destinationName); - query.setParameter(2, lastMessageId.get()); - query.setMaxResults(maxReturned); - int count = 0; - for (StoredMessage m : (List)query.getResultList()) { - Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData())); - listener.recoverMessage(message); - lastMessageId.set(m.getId()); - count++; - if (count >= maxReturned) { - return; - } - } - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { - EntityManager manager = adapter.beginEntityManager(null); - try { - - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - - Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc"); - query.setParameter(1, destinationName); - query.setParameter(2, ss.getLastAckedId()); - for (StoredMessage m : (List)query.getResultList()) { - Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData())); - listener.recoverMessage(message); - } - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - public void resetBatching(String clientId, String subscriptionName) { - SubscriptionId id = new SubscriptionId(); - id.setClientId(clientId); - id.setSubscriptionName(subscriptionName); - id.setDestination(destinationName); - - subscriberLastMessageMap.remove(id); - } - -} diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java deleted file mode 100644 index 738dff420c..0000000000 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java +++ /dev/null @@ -1,259 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jpa; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.SubscriptionInfo; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.TopicReferenceStore; -import org.apache.activemq.store.jpa.model.StoredMessageReference; -import org.apache.activemq.store.jpa.model.StoredSubscription; -import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId; -import org.apache.activemq.util.IOExceptionSupport; - -public class JPATopicReferenceStore extends JPAReferenceStore implements TopicReferenceStore { - private Map subscriberLastMessageMap = new ConcurrentHashMap(); - - public JPATopicReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) { - super(adapter, destination); - } - - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { - EntityManager manager = adapter.beginEntityManager(context); - try { - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - ss.setLastAckedId(messageId.getBrokerSequenceId()); - } catch (Throwable e) { - adapter.rollbackEntityManager(context, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(context, manager); - } - - public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { - EntityManager manager = adapter.beginEntityManager(null); - try { - StoredSubscription ss = new StoredSubscription(); - ss.setClientId(info.getClientId()); - ss.setSubscriptionName(info.getSubcriptionName()); - ss.setDestination(destinationName); - ss.setSelector(info.getSelector()); - ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName()); - ss.setLastAckedId(-1); - - if (!retroactive) { - Query query = manager.createQuery("select max(m.id) from StoredMessageReference m"); - Long rc = (Long)query.getSingleResult(); - if (rc != null) { - ss.setLastAckedId(rc); - } - } - - manager.persist(ss); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - public void deleteSubscription(String clientId, String subscriptionName) throws IOException { - EntityManager manager = adapter.beginEntityManager(null); - try { - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - manager.remove(ss); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) { - Query query = manager.createQuery("select ss from StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 " + "and ss.destination=?3"); - query.setParameter(1, clientId); - query.setParameter(2, subscriptionName); - query.setParameter(3, destinationName); - List resultList = query.getResultList(); - if (resultList.isEmpty()) { - return null; - } - return resultList.get(0); - } - - public SubscriptionInfo[] getAllSubscriptions() throws IOException { - SubscriptionInfo rc[]; - EntityManager manager = adapter.beginEntityManager(null); - try { - ArrayList l = new ArrayList(); - - Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1"); - query.setParameter(1, destinationName); - for (StoredSubscription ss : (List)query.getResultList()) { - SubscriptionInfo info = new SubscriptionInfo(); - info.setClientId(ss.getClientId()); - info.setDestination(destination); - info.setSelector(ss.getSelector()); - info.setSubscriptionName(ss.getSubscriptionName()); - info.setSubscribedDestination(toSubscribedDestination(ss)); - l.add(info); - } - - rc = new SubscriptionInfo[l.size()]; - l.toArray(rc); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc; - } - - private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) { - if (ss.getSubscribedDestination() == null) { - return null; - } - return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE); - } - - public int getMessageCount(String clientId, String subscriptionName) throws IOException { - Long rc; - EntityManager manager = adapter.beginEntityManager(null); - try { - Query query = manager.createQuery("select count(m) FROM StoredMessageReference m, StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 " - + "and ss.destination=?3 " + "and m.destination=ss.destination and m.id > ss.lastAckedId"); - query.setParameter(1, clientId); - query.setParameter(2, subscriptionName); - query.setParameter(3, destinationName); - rc = (Long)query.getSingleResult(); - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc.intValue(); - } - - public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { - SubscriptionInfo rc = null; - EntityManager manager = adapter.beginEntityManager(null); - try { - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - if (ss != null) { - rc = new SubscriptionInfo(); - rc.setClientId(ss.getClientId()); - rc.setDestination(destination); - rc.setSelector(ss.getSelector()); - rc.setSubscriptionName(ss.getSubscriptionName()); - rc.setSubscribedDestination(toSubscribedDestination(ss)); - } - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - return rc; - } - - public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { - EntityManager manager = adapter.beginEntityManager(null); - try { - SubscriptionId id = new SubscriptionId(); - id.setClientId(clientId); - id.setSubscriptionName(subscriptionName); - id.setDestination(destinationName); - - AtomicLong last = subscriberLastMessageMap.get(id); - if (last == null) { - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - last = new AtomicLong(ss.getLastAckedId()); - subscriberLastMessageMap.put(id, last); - } - final AtomicLong lastMessageId = last; - - Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc"); - query.setParameter(1, destinationName); - query.setParameter(2, lastMessageId.get()); - query.setMaxResults(maxReturned); - int count = 0; - for (StoredMessageReference m : (List)query.getResultList()) { - MessageId mid = new MessageId(m.getMessageId()); - mid.setBrokerSequenceId(m.getId()); - listener.recoverMessageReference(mid); - - lastMessageId.set(m.getId()); - count++; - if (count >= maxReturned) { - return; - } - } - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { - EntityManager manager = adapter.beginEntityManager(null); - try { - - StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName); - - Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc"); - query.setParameter(1, destinationName); - query.setParameter(2, ss.getLastAckedId()); - for (StoredMessageReference m : (List)query.getResultList()) { - MessageId mid = new MessageId(m.getMessageId()); - mid.setBrokerSequenceId(m.getId()); - listener.recoverMessageReference(mid); - } - } catch (Throwable e) { - adapter.rollbackEntityManager(null, manager); - throw IOExceptionSupport.create(e); - } - adapter.commitEntityManager(null, manager); - } - - public void resetBatching(String clientId, String subscriptionName) { - SubscriptionId id = new SubscriptionId(); - id.setClientId(clientId); - id.setSubscriptionName(subscriptionName); - id.setDestination(destinationName); - subscriberLastMessageMap.remove(id); - } - - public boolean acknowledgeReference(ConnectionContext context, - String clientId, String subscriptionName, MessageId messageId) - throws IOException { - acknowledge(context, clientId, subscriptionName, messageId); - return true; - } - -} diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java deleted file mode 100644 index a6fcb5b350..0000000000 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessage.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jpa.model; - -import javax.persistence.Basic; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.Lob; - -import org.apache.openjpa.persistence.jdbc.Index; - -/** - */ -@Entity() -public class StoredMessage { - - @Id - private long id; - - @Basic(optional = false) - @Index(enabled = true, unique = false) - private String messageId; - - @Basic(optional = false) - @Index(enabled = true, unique = false) - private String destination; - - @Basic - private long exiration; - - @Basic - @Lob - private byte[] data; - - public StoredMessage() { - } - - public byte[] getData() { - return data; - } - - public void setData(byte[] data) { - this.data = data; - } - - public String getDestination() { - return destination; - } - - public void setDestination(String destination) { - this.destination = destination; - } - - public long getExiration() { - return exiration; - } - - public void setExiration(long exiration) { - this.exiration = exiration; - } - - public String getMessageId() { - return messageId; - } - - public void setMessageId(String messageId) { - this.messageId = messageId; - } - - public long getId() { - return id; - } - - public void setId(long sequenceId) { - this.id = sequenceId; - } - -} diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java deleted file mode 100644 index ea7e1f84ea..0000000000 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredMessageReference.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jpa.model; - -import javax.persistence.Basic; -import javax.persistence.Entity; -import javax.persistence.Id; - -import org.apache.openjpa.persistence.jdbc.Index; - -/** - */ -@Entity() -public class StoredMessageReference { - - @Id - private long id; - - @Basic(optional = false) - @Index(enabled = true, unique = false) - private String messageId; - - @Basic(optional = false) - @Index(enabled = true, unique = false) - private String destination; - - @Basic - @Index(enabled = false, unique = false) - private long exiration; - - @Basic(optional = false) - @Index(enabled = false, unique = false) - private int fileId; - - @Basic(optional = false) - @Index(enabled = false, unique = false) - private int offset; - - public StoredMessageReference() { - } - - public String getDestination() { - return destination; - } - - public void setDestination(String destination) { - this.destination = destination; - } - - public long getExiration() { - return exiration; - } - - public void setExiration(long exiration) { - this.exiration = exiration; - } - - public String getMessageId() { - return messageId; - } - - public void setMessageId(String messageId) { - this.messageId = messageId; - } - - public long getId() { - return id; - } - - public void setId(long sequenceId) { - this.id = sequenceId; - } - - public int getFileId() { - return fileId; - } - - public void setFileId(int fileId) { - this.fileId = fileId; - } - - public int getOffset() { - return offset; - } - - public void setOffset(int offset) { - this.offset = offset; - } - -} diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java deleted file mode 100644 index 2f84b0f8bd..0000000000 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.jpa.model; - -import javax.persistence.Basic; -import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.GenerationType; -import javax.persistence.Id; - -import org.apache.openjpa.persistence.jdbc.Index; - -/** - */ -@Entity -public class StoredSubscription { - - /** - * Application identity class for Magazine. - */ - public static class SubscriptionId { - - public String destination; - public String clientId; - public String subscriptionName; - - public boolean equals(Object other) { - if (other == this) { - return true; - } - if (!(other instanceof SubscriptionId)) { - return false; - } - - SubscriptionId sid = (SubscriptionId)other; - return (destination == sid.destination || (destination != null && destination.equals(sid.destination))) - && (clientId == sid.clientId || (clientId != null && clientId.equals(sid.clientId))) - && (subscriptionName == sid.subscriptionName || (subscriptionName != null && subscriptionName.equals(sid.subscriptionName))); - } - - /** - * Hashcode must also depend on identity values. - */ - public int hashCode() { - return ((destination == null) ? 0 : destination.hashCode()) ^ ((clientId == null) ? 0 : clientId.hashCode()) ^ ((subscriptionName == null) ? 0 : subscriptionName.hashCode()); - } - - public String toString() { - return destination + ":" + clientId + ":" + subscriptionName; - } - - public String getClientId() { - return clientId; - } - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - public String getDestination() { - return destination; - } - - public void setDestination(String destination) { - this.destination = destination; - } - - public String getSubscriptionName() { - return subscriptionName; - } - - public void setSubscriptionName(String subscriptionName) { - this.subscriptionName = subscriptionName; - } - } - - @Id - @GeneratedValue(strategy = GenerationType.AUTO) - private long id; - - @Basic - @Index(enabled = true, unique = false) - private String destination; - @Basic - @Index(enabled = true, unique = false) - private String clientId; - @Basic - @Index(enabled = true, unique = false) - private String subscriptionName; - - @Basic - private long lastAckedId; - @Basic - private String selector; - @Basic - private String subscribedDestination; - - public long getLastAckedId() { - return lastAckedId; - } - - public void setLastAckedId(long lastAckedId) { - this.lastAckedId = lastAckedId; - } - - public String getSelector() { - return selector; - } - - public void setSelector(String selector) { - this.selector = selector; - } - - public String getDestination() { - return destination; - } - - public void setDestination(String destination) { - this.destination = destination; - } - - public String getClientId() { - return clientId; - } - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - public String getSubscriptionName() { - return subscriptionName; - } - - public void setSubscriptionName(String subscriptionName) { - this.subscriptionName = subscriptionName; - } - - public long getId() { - return id; - } - - public void setId(long id) { - this.id = id; - } - - public String getSubscribedDestination() { - return subscribedDestination; - } - - public void setSubscribedDestination(String subscribedDestination) { - this.subscribedDestination = subscribedDestination; - } -} diff --git a/activemq-jpa-store/src/main/resources/META-INF/LICENSE b/activemq-jpa-store/src/main/resources/META-INF/LICENSE deleted file mode 100755 index 6b0b1270ff..0000000000 --- a/activemq-jpa-store/src/main/resources/META-INF/LICENSE +++ /dev/null @@ -1,203 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - diff --git a/activemq-jpa-store/src/main/resources/META-INF/NOTICE b/activemq-jpa-store/src/main/resources/META-INF/NOTICE deleted file mode 100644 index 68601536a7..0000000000 --- a/activemq-jpa-store/src/main/resources/META-INF/NOTICE +++ /dev/null @@ -1,12 +0,0 @@ -========================================================================= -== NOTICE file corresponding to the section 4 d of == -== the Apache License, Version 2.0, == -== in this case for the Apache ActiveMQ distribution. == -========================================================================= - -Apache ActiveMQ -Copyright 2005-2006 The Apache Software Foundation - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). - diff --git a/activemq-jpa-store/src/main/resources/META-INF/persistence.xml b/activemq-jpa-store/src/main/resources/META-INF/persistence.xml deleted file mode 100644 index 5483a6e58c..0000000000 --- a/activemq-jpa-store/src/main/resources/META-INF/persistence.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - - - org.apache.openjpa.persistence.PersistenceProviderImpl - org.apache.activemq.store.jpa.model.StoredMessage - org.apache.activemq.store.jpa.model.StoredSubscription - org.apache.activemq.store.jpa.model.StoredMessageReference - - diff --git a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java b/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java deleted file mode 100644 index f52d32f820..0000000000 --- a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPARecoveryBrokerTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.store; - -import java.util.Properties; - -import junit.framework.Test; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.RecoveryBrokerTest; -import org.apache.activemq.store.jpa.JPAPersistenceAdapter; - -/** - * Used to verify that recovery works correctly against - * - * @version $Revision$ - */ -public class JPARecoveryBrokerTest extends RecoveryBrokerTest { - - protected BrokerService createBroker() throws Exception { - BrokerService service = new BrokerService(); - service.setDeleteAllMessagesOnStartup(true); - JPAPersistenceAdapter pa = new JPAPersistenceAdapter(); - Properties props = new Properties(); - props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); - props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true"); - props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema"); -// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); - pa.setEntityManagerProperties(props); - service.setPersistenceAdapter(pa); - return service; - - } - - protected BrokerService createRestartedBroker() throws Exception { - BrokerService service = new BrokerService(); - JPAPersistenceAdapter pa = new JPAPersistenceAdapter(); - Properties props = new Properties(); - props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); - props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true"); - props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema"); -// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); - pa.setEntityManagerProperties(props); - service.setPersistenceAdapter(pa); - return service; - } - - public static Test suite() { - return suite(JPARecoveryBrokerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} diff --git a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java b/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java deleted file mode 100644 index cc58e3d69b..0000000000 --- a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/JPAStoreLoadTester.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.store; - -import junit.framework.Test; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.xbean.BrokerFactoryBean; -import org.springframework.core.io.ClassPathResource; - -/** - * @version $Revision$ - */ -public class JPAStoreLoadTester extends LoadTester { - - protected BrokerService createBroker() throws Exception { - BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/jpabroker.xml")); - brokerFactory.afterPropertiesSet(); - BrokerService broker = brokerFactory.getBroker(); - broker.setDeleteAllMessagesOnStartup(true); - return broker; - } - - public static Test suite() { - return suite(JPAStoreLoadTester.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} diff --git a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java b/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java deleted file mode 100644 index e5fc38e45d..0000000000 --- a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreLoadTester.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.store; - -import junit.framework.Test; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.xbean.BrokerFactoryBean; -import org.springframework.core.io.ClassPathResource; - -/** - * @version $Revision$ - */ -public class QuickJPAStoreLoadTester extends LoadTester { - - protected BrokerService createBroker() throws Exception { - BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/quickjpabroker.xml")); - brokerFactory.afterPropertiesSet(); - BrokerService broker = brokerFactory.getBroker(); - broker.setDeleteAllMessagesOnStartup(true); - return broker; - } - - public static Test suite() { - return suite(QuickJPAStoreLoadTester.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} diff --git a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java b/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java deleted file mode 100644 index 5e954f2ed5..0000000000 --- a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreRecoveryBrokerTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.store; - -import java.util.Properties; -import junit.framework.Test; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.RecoveryBrokerTest; -import org.apache.activemq.store.amq.AMQPersistenceAdapter; -import org.apache.activemq.store.jpa.JPAReferenceStoreAdapter; - -/** - * Used to verify that recovery works correctly against - * - * @version $Revision$ - */ -public class QuickJPAStoreRecoveryBrokerTest extends RecoveryBrokerTest { - - protected BrokerService createBroker() throws Exception { - BrokerService service = new BrokerService(); - service.setDeleteAllMessagesOnStartup(true); - AMQPersistenceAdapter pa = new AMQPersistenceAdapter(); - - JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter(); - Properties props = new Properties(); - props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); - props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true"); - props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema"); -// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); - rfa.setEntityManagerProperties(props); - pa.setReferenceStoreAdapter(rfa); - - service.setPersistenceAdapter(pa); - return service; - } - - protected BrokerService createRestartedBroker() throws Exception { - BrokerService service = new BrokerService(); - AMQPersistenceAdapter pa = new AMQPersistenceAdapter(); - - JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter(); - Properties props = new Properties(); - props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); - props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true"); - props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema"); -// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); - rfa.setEntityManagerProperties(props); - pa.setReferenceStoreAdapter(rfa); - - service.setPersistenceAdapter(pa); - return service; - } - - public static Test suite() { - return suite(QuickJPAStoreRecoveryBrokerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} diff --git a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java b/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java deleted file mode 100644 index d989957ee1..0000000000 --- a/activemq-jpa-store/src/test/java/org/apache/activemq/broker/store/QuickJPAStoreXARecoveryBrokerTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.store; - -import java.util.Properties; -import junit.framework.Test; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.XARecoveryBrokerTest; -import org.apache.activemq.store.amq.AMQPersistenceAdapter; -import org.apache.activemq.store.jpa.JPAReferenceStoreAdapter; - -/** - * Used to verify that recovery works correctly against - * - * @version $Revision$ - */ -public class QuickJPAStoreXARecoveryBrokerTest extends XARecoveryBrokerTest { - - public static Test suite() { - return suite(QuickJPAStoreXARecoveryBrokerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - protected BrokerService createBroker() throws Exception { - BrokerService service = new BrokerService(); - service.setDeleteAllMessagesOnStartup(true); - AMQPersistenceAdapter pa = new AMQPersistenceAdapter(); - - JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter(); - Properties props = new Properties(); - props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); - props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true"); - props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema"); -// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); - rfa.setEntityManagerProperties(props); - pa.setReferenceStoreAdapter(rfa); - - service.setPersistenceAdapter(pa); - return service; - } - - protected BrokerService createRestartedBroker() throws Exception { - BrokerService service = new BrokerService(); - AMQPersistenceAdapter pa = new AMQPersistenceAdapter(); - - JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter(); - Properties props = new Properties(); - props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver"); - props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true"); - props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema"); -// props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); - rfa.setEntityManagerProperties(props); - pa.setReferenceStoreAdapter(rfa); - - service.setPersistenceAdapter(pa); - return service; - } - -} diff --git a/activemq-jpa-store/src/test/resources/log4j.properties b/activemq-jpa-store/src/test/resources/log4j.properties deleted file mode 100644 index 3f12e671d1..0000000000 --- a/activemq-jpa-store/src/test/resources/log4j.properties +++ /dev/null @@ -1,35 +0,0 @@ -## --------------------------------------------------------------------------- -## Licensed to the Apache Software Foundation (ASF) under one or more -## contributor license agreements. See the NOTICE file distributed with -## this work for additional information regarding copyright ownership. -## The ASF licenses this file to You under the Apache License, Version 2.0 -## (the "License"); you may not use this file except in compliance with -## the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, software -## distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -## See the License for the specific language governing permissions and -## limitations under the License. -## --------------------------------------------------------------------------- - -# -# The logging properties used during tests.. -# -log4j.rootLogger=INFO, out - -log4j.logger.org.apache.activemq.spring=WARN - -# CONSOLE appender not used by default -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n - -# File appender -log4j.appender.out=org.apache.log4j.FileAppender -log4j.appender.out.layout=org.apache.log4j.PatternLayout -log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n -log4j.appender.out.file=target/activemq-test.log -log4j.appender.out.append=true diff --git a/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml b/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml deleted file mode 100644 index a531adaad9..0000000000 --- a/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/jpabroker.xml +++ /dev/null @@ -1,44 +0,0 @@ - - - - - - - - - - - - - - - - - org.apache.derby.jdbc.EmbeddedDriver - jdbc:derby:activemq-data/derby;create=true - buildSchema - - - - - - diff --git a/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml b/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml deleted file mode 100644 index 5366f1bc25..0000000000 --- a/activemq-jpa-store/src/test/resources/org/apache/activemq/broker/store/quickjpabroker.xml +++ /dev/null @@ -1,48 +0,0 @@ - - - - - - - - - - - - - - - - - - - - org.apache.derby.jdbc.EmbeddedDriver - jdbc:derby:activemq-data/derby;create=true - buildSchema - - - - - - - diff --git a/pom.xml b/pom.xml index 0d06d779a2..737e58b67b 100755 --- a/pom.xml +++ b/pom.xml @@ -126,7 +126,6 @@ activemq-core activemq-fileserver activemq-jaas - activemq-jpa-store activemq-openwire-generator activemq-optional activemq-pool @@ -183,11 +182,6 @@ activemq-jaas ${activemq-version} - - org.apache.activemq - activemq-jpa-store - ${activemq-version} - org.apache.activemq activemq-jmdns_1.0