Added the initial cut of a JPA based message store.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@477273 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-20 18:13:50 +00:00
parent ea66d50338
commit 0607226831
10 changed files with 1072 additions and 29 deletions

View File

@ -156,6 +156,10 @@
<version>1.2.24</version> <version>1.2.24</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-persistence-jdbc</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -270,6 +274,7 @@
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId> <artifactId>maven-antrun-plugin</artifactId>
<!--
<configuration> <configuration>
<tasks> <tasks>
<taskdef name="generate" classname="org.apache.activemq.openwire.tool.JavaGeneratorTask"/> <taskdef name="generate" classname="org.apache.activemq.openwire.tool.JavaGeneratorTask"/>
@ -283,34 +288,39 @@
<version>${activemq-version}</version> <version>${activemq-version}</version>
</dependency> </dependency>
</dependencies> </dependencies>
-->
<executions>
<execution>
<phase>process-classes</phase>
<configuration>
<tasks>
<path id="cp">
<path refid="maven.test.classpath"/>
<path refid="maven.compile.classpath"/>
<path refid="maven.dependency.classpath"/>
</path>
<taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask">
<classpath refid="cp"/>
</taskdef>
<openjpac directory="${basedir}/target/jpa-classes">
<classpath refid="cp"/>
<fileset dir="${basedir}/target/classes">
<include name="org/apache/activemq/store/jpa/model/*.class"/>
</fileset>
</openjpac>
<copy todir="${basedir}/target/classes">
<fileset dir="${basedir}/target/jpa-classes"/>
</copy>
</tasks>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin> </plugin>
<!-- Use Gram to Gernerate the OpenWire Marshallers -->
<!--
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>maven-gram-plugin</artifactId>
<version>4.1-incubator</version>
<configuration>
<scripts>
:GenerateJavaMarshalling.groovy: GenerateJavaTests.groovy: GenerateCSharpMarshalling.groovy:
GenerateCSharpClasses.groovy: GenerateCppMarshallingClasses.groovy: GenerateCppMarshallingHeaders.groovy:
GenerateCppHeaders.groovy: GenerateCppClasses.groovy: GenerateCMarshalling.groovy:
</scripts>
<groovyProperties>
<version>2</version>
</groovyProperties>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-openwire-generator</artifactId>
<version>${activemq-version}</version>
</dependency>
</dependencies>
</plugin>
-->
<plugin> <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId> <artifactId>javacc-maven-plugin</artifactId>

View File

@ -0,0 +1,194 @@
package org.apache.activemq.store.jpa;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.jpa.model.StoredMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
public class JPAMessageStore implements MessageStore {
protected final JPAPersistenceAdapter adapter;
protected final WireFormat wireFormat;
protected final ActiveMQDestination destination;
protected final String destinationName;
protected AtomicLong lastMessageId = new AtomicLong(-1);
public JPAMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
this.adapter = adapter;
this.destination = destination;
this.destinationName = destination.getQualifiedName();
this.wireFormat = this.adapter.getWireFormat();
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
EntityManager manager = adapter.beginEntityManager(context);
try {
ByteSequence sequence = wireFormat.marshal(message);
sequence.compact();
StoredMessage sm = new StoredMessage();
sm.setDestination(destinationName);
sm.setId(message.getMessageId().getBrokerSequenceId());
sm.setMessageId(message.getMessageId().toString());
sm.setExiration(message.getExpiration());
sm.setData(sequence.data);
manager.persist(sm);
} catch (Throwable e) {
adapter.rollbackEntityManager(context,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(context,manager);
}
public void addMessageReference(ConnectionContext context,
MessageId messageId, long expirationTime, String messageRef)
throws IOException {
throw new IOException("Not implemented.");
}
public ActiveMQDestination getDestination() {
return destination;
}
public Message getMessage(MessageId identity) throws IOException {
Message rc;
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredMessage message=null;
if( identity.getBrokerSequenceId()!= 0 ) {
message = manager.find(StoredMessage.class, identity.getBrokerSequenceId());
} else {
Query query = manager.createQuery("select m from StoredMessage m where m.messageId=?1");
query.setParameter(1, identity.toString());
message = (StoredMessage) query.getSingleResult();
}
rc = (Message) wireFormat.unmarshal(new ByteSequence(message.getData()));
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc;
}
public int getMessageCount() throws IOException {
Integer rc;
EntityManager manager = adapter.beginEntityManager(null);
try {
Query query = manager.createQuery("select count(m) from StoredMessage m");
rc = (Integer) query.getSingleResult();
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc;
}
public String getMessageReference(MessageId identity) throws IOException {
throw new IOException("Not implemented.");
}
public void recover(MessageRecoveryListener container) throws Exception {
EntityManager manager = adapter.beginEntityManager(null);
try {
Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 order by m.id asc");
query.setParameter(1, destinationName);
for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
container.recoverMessage(message);
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
EntityManager manager = adapter.beginEntityManager(null);
try {
Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
query.setParameter(1, destinationName);
query.setParameter(2, lastMessageId.get());
query.setMaxResults(maxReturned);
int count = 0;
for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
listener.recoverMessage(message);
lastMessageId.set(m.getId());
count++;
if( count >= maxReturned ) {
return;
}
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
EntityManager manager = adapter.beginEntityManager(context);
try {
Query query = manager.createQuery("delete from StoredMessage m where m.destination=?1");
query.setParameter(1, destinationName);
query.executeUpdate();
} catch (Throwable e) {
adapter.rollbackEntityManager(context,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(context,manager);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
EntityManager manager = adapter.beginEntityManager(context);
try {
Query query = manager.createQuery("delete from StoredMessage m where m.id=?1");
query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId());
query.executeUpdate();
} catch (Throwable e) {
adapter.rollbackEntityManager(context,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(context,manager);
}
public void resetBatching() {
lastMessageId.set(-1);
}
public void setUsageManager(UsageManager usageManager) {
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
}

View File

@ -0,0 +1,253 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jpa;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import javax.persistence.Query;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An implementation of {@link PersistenceAdapter} that uses JPA to
* store it's messages.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.17 $
*/
public class JPAPersistenceAdapter implements PersistenceAdapter {
private static final Log log = LogFactory.getLog(JPAPersistenceAdapter.class);
String entityManagerName = "activemq";
Properties entityManagerProperties = System.getProperties();
EntityManagerFactory entityManagerFactory;
private WireFormat wireFormat;
private MemoryTransactionStore transactionStore;
public void beginTransaction(ConnectionContext context) throws IOException {
if( context.getLongTermStoreContext()!=null )
throw new IOException("Transation already started.");
EntityManager manager = getEntityManagerFactory().createEntityManager();
manager.getTransaction().begin();
context.setLongTermStoreContext(manager);
}
public void commitTransaction(ConnectionContext context) throws IOException {
EntityManager manager = (EntityManager) context.getLongTermStoreContext();
if( manager==null )
throw new IOException("Transation not started.");
context.setLongTermStoreContext(null);
manager.getTransaction().commit();
manager.close();
}
public void rollbackTransaction(ConnectionContext context) throws IOException {
EntityManager manager = (EntityManager) context.getLongTermStoreContext();
if( manager==null )
throw new IOException("Transation not started.");
context.setLongTermStoreContext(null);
manager.getTransaction().rollback();
manager.close();
}
public EntityManager beginEntityManager(ConnectionContext context) {
if( context==null || context.getLongTermStoreContext()==null ) {
EntityManager manager = getEntityManagerFactory().createEntityManager();
manager.getTransaction().begin();
return manager;
} else {
return (EntityManager) context.getLongTermStoreContext();
}
}
public void commitEntityManager(ConnectionContext context, EntityManager manager) {
if( context==null || context.getLongTermStoreContext()==null ) {
manager.getTransaction().commit();
manager.close();
}
}
public void rollbackEntityManager(ConnectionContext context, EntityManager manager) {
if( context==null || context.getLongTermStoreContext()==null ) {
manager.getTransaction().rollback();
manager.close();
}
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = new JPAMessageStore(this, destination);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
return rc;
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = new JPATopicMessageStore(this, destination);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
return rc;
}
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
transactionStore = new MemoryTransactionStore();
}
return this.transactionStore;
}
public void deleteAllMessages() throws IOException {
EntityManager manager = beginEntityManager(null);
try {
Query query = manager.createQuery("delete from StoredMessage m");
query.executeUpdate();
query = manager.createQuery("delete from StoredSubscription ss");
query.executeUpdate();
} catch (Throwable e) {
rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
commitEntityManager(null,manager);
}
public Set getDestinations() {
HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
EntityManager manager = beginEntityManager(null);
try {
Query query = manager.createQuery("select distinct m.destination from StoredMessage m");
for (String dest : (List<String>)query.getResultList()) {
rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE));
}
} catch (RuntimeException e) {
rollbackEntityManager(null,manager);
throw e;
}
commitEntityManager(null,manager);
return rc;
}
public long getLastMessageBrokerSequenceId() throws IOException {
long rc=0;
EntityManager manager = beginEntityManager(null);
try {
Query query = manager.createQuery("select max(m.id) from StoredMessage m");
Long t = (Long) query.getSingleResult();
if( t != null ) {
rc = t;
}
} catch (Throwable e) {
rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
commitEntityManager(null,manager);
return rc;
}
public boolean isUseExternalMessageReferences() {
return false;
}
public void setUsageManager(UsageManager usageManager) {
}
public void setUseExternalMessageReferences(boolean enable) {
if( enable ) {
throw new IllegalArgumentException("This persistence adapter does not support externa message references");
}
}
public void start() throws Exception {
}
public void stop() throws Exception {
if( entityManagerFactory !=null ) {
entityManagerFactory.close();
}
}
public EntityManagerFactory getEntityManagerFactory() {
if( entityManagerFactory == null ) {
entityManagerFactory = createEntityManagerFactory();
}
return entityManagerFactory;
}
protected EntityManagerFactory createEntityManagerFactory() {
return Persistence.createEntityManagerFactory(getEntityManagerName(), getEntityManagerProperties());
}
public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
this.entityManagerFactory = entityManagerFactory;
}
public Properties getEntityManagerProperties() {
return entityManagerProperties;
}
public void setEntityManagerProperties(
Properties entityManagerProperties) {
this.entityManagerProperties = entityManagerProperties;
}
public String getEntityManagerName() {
return entityManagerName;
}
public void setEntityManagerName(String entityManager) {
this.entityManagerName = entityManager;
}
public WireFormat getWireFormat() {
if(wireFormat==null) {
wireFormat = createWireFormat();
}
return wireFormat;
}
private WireFormat createWireFormat() {
OpenWireFormatFactory wff = new OpenWireFormatFactory();
return wff.createWireFormat();
}
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = wireFormat;
}
}

View File

@ -0,0 +1,233 @@
package org.apache.activemq.store.jpa;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.jpa.model.StoredMessage;
import org.apache.activemq.store.jpa.model.StoredSubscription;
import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
public class JPATopicMessageStore extends JPAMessageStore implements TopicMessageStore {
private Map<SubscriptionId,AtomicLong> subscriberLastMessageMap=new ConcurrentHashMap<SubscriptionId,AtomicLong>();
public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
super(adapter, destination);
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
EntityManager manager = adapter.beginEntityManager(context);
try {
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
ss.setLastAckedId(messageId.getBrokerSequenceId());
} catch (Throwable e) {
adapter.rollbackEntityManager(context,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(context,manager);
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = new StoredSubscription();
ss.setClientId(clientId);
ss.setSubscriptionName(subscriptionName);
ss.setDestination(destinationName);
ss.setSelector(selector);
ss.setLastAckedId(-1);
if( !retroactive ) {
Query query = manager.createQuery("select max(m.id) from StoredMessage m");
Long rc = (Long) query.getSingleResult();
if( rc != null ) {
ss.setLastAckedId(rc);
}
}
manager.persist(ss);
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
manager.remove(ss);
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
Query query = manager.createQuery(
"select ss from StoredSubscription ss " +
"where ss.clientId=?1 " +
"and ss.subscriptionName=?2 " +
"and ss.destination=?3");
query.setParameter(1, clientId);
query.setParameter(2, subscriptionName);
query.setParameter(3, destinationName);
List<StoredSubscription> resultList = query.getResultList();
if( resultList.isEmpty() )
return null;
return resultList.get(0);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
SubscriptionInfo rc[];
EntityManager manager = adapter.beginEntityManager(null);
try {
ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
query.setParameter(1, destinationName);
for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(ss.getClientId());
info.setDestination(destination);
info.setSelector(ss.getSelector());
info.setSubcriptionName(ss.getSubscriptionName());
l.add(info);
}
rc = new SubscriptionInfo[l.size()];
l.toArray(rc);
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc;
}
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
Integer rc;
EntityManager manager = adapter.beginEntityManager(null);
try {
Query query = manager.createQuery(
"select count(m) FROM StoredMessage m, StoredSubscription ss " +
"where ss.clientId=?1 " +
"and ss.subscriptionName=?2 " +
"and ss.destination=?3 " +
"and m.desination=ss.destination and m.id > ss.lastAckedId");
query.setParameter(1, clientId);
query.setParameter(2, subscriptionName);
query.setParameter(2, destinationName);
rc = (Integer) query.getSingleResult();
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc;
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
SubscriptionInfo rc=null;
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
if( ss != null ) {
rc = new SubscriptionInfo();
rc.setClientId(ss.getClientId());
rc.setDestination(destination);
rc.setSelector(ss.getSelector());
rc.setSubcriptionName(ss.getSubscriptionName());
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc;
}
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
EntityManager manager = adapter.beginEntityManager(null);
try {
SubscriptionId id = new SubscriptionId();
id.setClientId(clientId);
id.setSubscriptionName(subscriptionName);
id.setDestination(destinationName);
AtomicLong last=subscriberLastMessageMap.get(id);
if(last==null){
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
last=new AtomicLong(ss.getLastAckedId());
subscriberLastMessageMap.put(id,last);
}
final AtomicLong lastMessageId=last;
Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
query.setParameter(1, destinationName);
query.setParameter(2, lastMessageId.get());
query.setMaxResults(maxReturned);
int count = 0;
for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
listener.recoverMessage(message);
lastMessageId.set(m.getId());
count++;
if( count >= maxReturned ) {
return;
}
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
query.setParameter(1, destinationName);
query.setParameter(2, ss.getLastAckedId());
for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
Message message = (Message) wireFormat.unmarshal(new ByteSequence(m.getData()));
listener.recoverMessage(message);
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void resetBatching(String clientId, String subscriptionName) {
SubscriptionId id = new SubscriptionId();
id.setClientId(clientId);
id.setSubscriptionName(subscriptionName);
id.setDestination(destinationName);
subscriberLastMessageMap.remove(id);
}
}

View File

@ -0,0 +1,85 @@
/*
* Copyright 2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jpa.model;
import javax.persistence.Basic;
import javax.persistence.Entity;
import javax.persistence.Id;
/**
*/
@Entity
public class StoredMessage {
@Id
private long id;
@Basic
private String messageId;
@Basic
private String destination;
@Basic
private long exiration;
@Basic
private byte[] data;
public StoredMessage() {
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public long getExiration() {
return exiration;
}
public void setExiration(long exiration) {
this.exiration = exiration;
}
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public long getId() {
return id;
}
public void setId(long sequenceId) {
this.id = sequenceId;
}
}

View File

@ -0,0 +1,153 @@
/*
* Copyright 2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jpa.model;
import javax.persistence.Basic;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
/**
*/
@Entity
public class StoredSubscription {
/**
* Application identity class for Magazine.
*/
public static class SubscriptionId {
public String destination;
public String clientId;
public String subscriptionName;
public boolean equals(Object other) {
if (other == this)
return true;
if (!(other instanceof SubscriptionId))
return false;
SubscriptionId sid = (SubscriptionId) other;
return (destination == sid.destination || (destination != null && destination.equals(sid.destination)))
&& (clientId == sid.clientId || (clientId != null && clientId.equals(sid.clientId)))
&& (subscriptionName == sid.subscriptionName || (subscriptionName != null && subscriptionName.equals(sid.subscriptionName)));
}
/**
* Hashcode must also depend on identity values.
*/
public int hashCode() {
return ((destination == null) ? 0 : destination.hashCode())
^ ((clientId == null) ? 0 : clientId.hashCode())
^ ((subscriptionName == null) ? 0 : subscriptionName.hashCode())
;
}
public String toString() {
return destination + ":" + clientId + ":" + subscriptionName;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getSubscriptionName() {
return subscriptionName;
}
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
}
@Id
@GeneratedValue(strategy=GenerationType.AUTO)
private long id;
@Basic
private String destination;
@Basic
private String clientId;
@Basic
private String subscriptionName;
@Basic
private long lastAckedId;
@Basic
private String selector;
public long getLastAckedId() {
return lastAckedId;
}
public void setLastAckedId(long lastAckedId) {
this.lastAckedId = lastAckedId;
}
public String getSelector() {
return selector;
}
public void setSelector(String selector) {
this.selector = selector;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getSubscriptionName() {
return subscriptionName;
}
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
}

View File

@ -54,5 +54,14 @@ public class ByteSequence {
public void setOffset(int offset) { public void setOffset(int offset) {
this.offset = offset; this.offset = offset;
} }
public void compact() {
if( length != data.length ) {
byte t[] = new byte[length];
System.arraycopy(data, offset, t, 0, length);
data=t;
offset=0;
}
}
} }

View File

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2006 The Apache Software Foundation.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
version="1.0">
<persistence-unit name="activemq" transaction-type="RESOURCE_LOCAL">
<provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
<class>org.apache.activemq.store.jpa.model.StoredMessage</class>
<class>org.apache.activemq.store.jpa.model.StoredSubscription</class>
<!--
<class>org.apache.activemq.store.jpa.model.StoredSubscription$SubscriptionId</class>
-->
</persistence-unit>
</persistence>

View File

@ -0,0 +1,71 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.store;
import java.util.Properties;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.RecoveryBrokerTest;
import org.apache.activemq.store.jpa.JPAPersistenceAdapter;
/**
* Used to verify that recovery works correctly against
*
* @version $Revision$
*/
public class JPARecoveryBrokerTest extends RecoveryBrokerTest {
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
service.setDeleteAllMessagesOnStartup(true);
JPAPersistenceAdapter pa = new JPAPersistenceAdapter();
Properties props = new Properties();
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
pa.setEntityManagerProperties(props);
service.setPersistenceAdapter(pa);
return service;
}
protected BrokerService createRestartedBroker() throws Exception {
BrokerService service = new BrokerService();
JPAPersistenceAdapter pa = new JPAPersistenceAdapter();
Properties props = new Properties();
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
pa.setEntityManagerProperties(props);
service.setPersistenceAdapter(pa);
return service;
}
public static Test suite() {
return suite(JPARecoveryBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

13
pom.xml
View File

@ -314,6 +314,12 @@
<version>${commons-collections-version}</version> <version>${commons-collections-version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-persistence-jdbc</artifactId>
<version>${openjpa-version}</version>
</dependency>
<!-- Optional Spring Support --> <!-- Optional Spring Support -->
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
@ -868,8 +874,9 @@
<axis-version>1.2-RC1</axis-version> <axis-version>1.2-RC1</axis-version>
<cglib-version>2.0</cglib-version> <cglib-version>2.0</cglib-version>
<commons-beanutils-version>1.6.1</commons-beanutils-version> <commons-beanutils-version>1.6.1</commons-beanutils-version>
<commons-collections-version>2.1</commons-collections-version> <commons-collections-version>3.1</commons-collections-version>
<commons-dbcp-version>1.2</commons-dbcp-version> <openjpa-version>0.9.6-incubating</openjpa-version>
<commons-dbcp-version>1.2.1</commons-dbcp-version>
<commons-httpclient-version>2.0.1</commons-httpclient-version> <commons-httpclient-version>2.0.1</commons-httpclient-version>
<commons-logging-version>1.1</commons-logging-version> <commons-logging-version>1.1</commons-logging-version>
<commons-pool-version>1.2</commons-pool-version> <commons-pool-version>1.2</commons-pool-version>
@ -887,7 +894,7 @@
<junit-version>3.8.1</junit-version> <junit-version>3.8.1</junit-version>
<jxta-version>2.0</jxta-version> <jxta-version>2.0</jxta-version>
<log4j-version>1.2.12</log4j-version> <log4j-version>1.2.12</log4j-version>
<org-apache-derby-version>10.1.1.0</org-apache-derby-version> <org-apache-derby-version>10.1.3.1</org-apache-derby-version>
<org-apache-geronimo-specs-version>1.0</org-apache-geronimo-specs-version> <org-apache-geronimo-specs-version>1.0</org-apache-geronimo-specs-version>
<org-apache-maven-surefire-plugin-version>2.2</org-apache-maven-surefire-plugin-version> <org-apache-maven-surefire-plugin-version>2.2</org-apache-maven-surefire-plugin-version>
<p2psockets-version>1.1.2</p2psockets-version> <p2psockets-version>1.1.2</p2psockets-version>