Added a JPA based ReferenceStoreAdapter implementation

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@493226 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-01-05 23:05:40 +00:00
parent c1612b5e06
commit e30a9dddde
16 changed files with 1128 additions and 20 deletions

View File

@ -88,7 +88,7 @@ public class QuickPersistenceAdapter implements PersistenceAdapter, UsageListene
private UsageManager usageManager; private UsageManager usageManager;
private long cleanupInterval = 1000 * 1/10; private long cleanupInterval = 1000 * 60;
private long checkpointInterval = 1000 * 10; private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1; private int maxCheckpointWorkers = 1;

View File

@ -73,11 +73,39 @@
<artifactId>commons-pool</artifactId> <artifactId>commons-pool</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
<optional>true</optional>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>org.apache.xbean</groupId>
<artifactId>maven-xbean-plugin</artifactId>
<version>${xbean-version}</version>
<executions>
<execution>
<configuration>
<namespace>http://activemq.org/activemq-jpa-store/config/1.0</namespace>
<schema>target/xbean/activemq-jpa-store.xsd</schema>
</configuration>
<goals>
<goal>mapping</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId> <artifactId>maven-antrun-plugin</artifactId>

View File

@ -1,3 +1,20 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jpa; package org.apache.activemq.store.jpa;
import java.io.IOException; import java.io.IOException;
@ -59,12 +76,6 @@ public class JPAMessageStore implements MessageStore {
adapter.commitEntityManager(context,manager); adapter.commitEntityManager(context,manager);
} }
public void addMessageReference(ConnectionContext context,
MessageId messageId, long expirationTime, String messageRef)
throws IOException {
throw new IOException("Not implemented.");
}
public ActiveMQDestination getDestination() { public ActiveMQDestination getDestination() {
return destination; return destination;
} }
@ -105,10 +116,7 @@ public class JPAMessageStore implements MessageStore {
return rc.intValue(); return rc.intValue();
} }
public String getMessageReference(MessageId identity) throws IOException { @SuppressWarnings("unchecked")
throw new IOException("Not implemented.");
}
public void recover(MessageRecoveryListener container) throws Exception { public void recover(MessageRecoveryListener container) throws Exception {
EntityManager manager = adapter.beginEntityManager(null); EntityManager manager = adapter.beginEntityManager(null);
try { try {
@ -125,6 +133,7 @@ public class JPAMessageStore implements MessageStore {
adapter.commitEntityManager(null,manager); adapter.commitEntityManager(null,manager);
} }
@SuppressWarnings("unchecked")
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
EntityManager manager = adapter.beginEntityManager(null); EntityManager manager = adapter.beginEntityManager(null);

View File

@ -48,7 +48,7 @@ import org.apache.commons.logging.LogFactory;
* An implementation of {@link PersistenceAdapter} that uses JPA to * An implementation of {@link PersistenceAdapter} that uses JPA to
* store it's messages. * store it's messages.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean element="jpaPersistenceAdapter"
* *
* @version $Revision: 1.17 $ * @version $Revision: 1.17 $
*/ */
@ -149,7 +149,7 @@ public class JPAPersistenceAdapter implements PersistenceAdapter {
commitEntityManager(null,manager); commitEntityManager(null,manager);
} }
public Set getDestinations() { public Set<ActiveMQDestination> getDestinations() {
HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
EntityManager manager = beginEntityManager(null); EntityManager manager = beginEntityManager(null);
@ -190,12 +190,6 @@ public class JPAPersistenceAdapter implements PersistenceAdapter {
public void setUsageManager(UsageManager usageManager) { public void setUsageManager(UsageManager usageManager) {
} }
public void setUseExternalMessageReferences(boolean enable) {
if( enable ) {
throw new IllegalArgumentException("This persistence adapter does not support externa message references");
}
}
public void start() throws Exception { public void start() throws Exception {
} }

View File

@ -0,0 +1,210 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jpa;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.jpa.model.StoredMessageReference;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
public class JPAReferenceStore implements ReferenceStore {
protected final JPAPersistenceAdapter adapter;
protected final WireFormat wireFormat;
protected final ActiveMQDestination destination;
protected final String destinationName;
protected AtomicLong lastMessageId = new AtomicLong(-1);
public JPAReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
this.adapter = adapter;
this.destination = destination;
this.destinationName = destination.getQualifiedName();
this.wireFormat = this.adapter.getWireFormat();
}
public ActiveMQDestination getDestination() {
return destination;
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
public Message getMessage(MessageId identity) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
EntityManager manager = adapter.beginEntityManager(context);
try {
StoredMessageReference sm = new StoredMessageReference();
sm.setDestination(destinationName);
sm.setId(messageId.getBrokerSequenceId());
sm.setMessageId(messageId.toString());
sm.setExiration(data.getExpiration());
sm.setFileId(data.getFileId());
sm.setOffset(data.getOffset());
manager.persist(sm);
} catch (Throwable e) {
adapter.rollbackEntityManager(context,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(context,manager);
}
public ReferenceData getMessageReference(MessageId identity) throws IOException {
ReferenceData rc=null;
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredMessageReference message=null;
if( identity.getBrokerSequenceId()!= 0 ) {
message = manager.find(StoredMessageReference.class, identity.getBrokerSequenceId());
} else {
Query query = manager.createQuery("select m from StoredMessageReference m where m.messageId=?1");
query.setParameter(1, identity.toString());
message = (StoredMessageReference) query.getSingleResult();
}
if( message !=null ) {
rc = new ReferenceData();
rc.setExpiration(message.getExiration());
rc.setFileId(message.getFileId());
rc.setOffset(message.getOffset());
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc;
}
public int getMessageCount() throws IOException {
Long rc;
EntityManager manager = adapter.beginEntityManager(null);
try {
Query query = manager.createQuery("select count(m) from StoredMessageReference m");
rc = (Long) query.getSingleResult();
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc.intValue();
}
public void recover(MessageRecoveryListener container) throws Exception {
EntityManager manager = adapter.beginEntityManager(null);
try {
Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 order by m.id asc");
query.setParameter(1, destinationName);
for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
MessageId id = new MessageId(m.getMessageId());
id.setBrokerSequenceId(m.getId());
container.recoverMessageReference(id);
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
EntityManager manager = adapter.beginEntityManager(null);
try {
Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
query.setParameter(1, destinationName);
query.setParameter(2, lastMessageId.get());
query.setMaxResults(maxReturned);
int count = 0;
for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
MessageId id = new MessageId(m.getMessageId());
id.setBrokerSequenceId(m.getId());
listener.recoverMessageReference(id);
lastMessageId.set(m.getId());
count++;
if( count >= maxReturned ) {
return;
}
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
EntityManager manager = adapter.beginEntityManager(context);
try {
Query query = manager.createQuery("delete from StoredMessageReference m where m.destination=?1");
query.setParameter(1, destinationName);
query.executeUpdate();
} catch (Throwable e) {
adapter.rollbackEntityManager(context,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(context,manager);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
EntityManager manager = adapter.beginEntityManager(context);
try {
Query query = manager.createQuery("delete from StoredMessageReference m where m.id=?1");
query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId());
query.executeUpdate();
} catch (Throwable e) {
adapter.rollbackEntityManager(context,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(context,manager);
}
public void resetBatching() {
lastMessageId.set(-1);
}
public void setUsageManager(UsageManager usageManager) {
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
}

View File

@ -0,0 +1,131 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jpa;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.util.IOExceptionSupport;
/**
* An implementation of {@link ReferenceStoreAdapter} that uses JPA to
* store it's message references.
*
* @org.apache.xbean.XBean element="jpaReferenceStoreAdapter"
*
* @version $Revision: 1.17 $
*/
public class JPAReferenceStoreAdapter extends JPAPersistenceAdapter implements ReferenceStoreAdapter {
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
throw new RuntimeException("Use createQueueReferenceStore instead.");
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
throw new RuntimeException("Use createTopicReferenceStore instead.");
}
public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
JPAReferenceStore rc = new JPAReferenceStore(this, destination);
return rc;
}
public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
JPATopicReferenceStore rc = new JPATopicReferenceStore(this, destination);
return rc;
}
public void deleteAllMessages() throws IOException {
EntityManager manager = beginEntityManager(null);
try {
Query query = manager.createQuery("delete from StoredMessageReference m");
query.executeUpdate();
query = manager.createQuery("delete from StoredSubscription ss");
query.executeUpdate();
} catch (Throwable e) {
rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
commitEntityManager(null,manager);
}
public Set<ActiveMQDestination> getDestinations() {
HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
EntityManager manager = beginEntityManager(null);
try {
Query query = manager.createQuery("select distinct m.destination from StoredMessageReference m");
for (String dest : (List<String>)query.getResultList()) {
rc.add(ActiveMQDestination.createDestination(dest,ActiveMQDestination.QUEUE_TYPE));
}
} catch (RuntimeException e) {
rollbackEntityManager(null,manager);
throw e;
}
commitEntityManager(null,manager);
return rc;
}
public long getLastMessageBrokerSequenceId() throws IOException {
long rc=0;
EntityManager manager = beginEntityManager(null);
try {
Query query = manager.createQuery("select max(m.id) from StoredMessageReference m");
Long t = (Long) query.getSingleResult();
if( t != null ) {
rc = t;
}
} catch (Throwable e) {
rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
commitEntityManager(null,manager);
return rc;
}
public Set<Integer> getReferenceFileIdsInUse() throws IOException {
HashSet<Integer> rc=null;
EntityManager manager = beginEntityManager(null);
try {
Query query = manager.createQuery("select distinct m.fileId from StoredMessageReference m");
rc=new HashSet<Integer>((List<Integer>)query.getResultList());
} catch (Throwable e) {
rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
commitEntityManager(null,manager);
return rc;
}
}

View File

@ -1,3 +1,20 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jpa; package org.apache.activemq.store.jpa;
import java.io.IOException; import java.io.IOException;

View File

@ -0,0 +1,250 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jpa;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.jpa.model.StoredMessageReference;
import org.apache.activemq.store.jpa.model.StoredSubscription;
import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
import org.apache.activemq.util.IOExceptionSupport;
public class JPATopicReferenceStore extends JPAReferenceStore implements TopicReferenceStore {
private Map<SubscriptionId,AtomicLong> subscriberLastMessageMap=new ConcurrentHashMap<SubscriptionId,AtomicLong>();
public JPATopicReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
super(adapter, destination);
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
EntityManager manager = adapter.beginEntityManager(context);
try {
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
ss.setLastAckedId(messageId.getBrokerSequenceId());
} catch (Throwable e) {
adapter.rollbackEntityManager(context,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(context,manager);
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = new StoredSubscription();
ss.setClientId(clientId);
ss.setSubscriptionName(subscriptionName);
ss.setDestination(destinationName);
ss.setSelector(selector);
ss.setLastAckedId(-1);
if( !retroactive ) {
Query query = manager.createQuery("select max(m.id) from StoredMessageReference m");
Long rc = (Long) query.getSingleResult();
if( rc != null ) {
ss.setLastAckedId(rc);
}
}
manager.persist(ss);
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
manager.remove(ss);
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
Query query = manager.createQuery(
"select ss from StoredSubscription ss " +
"where ss.clientId=?1 " +
"and ss.subscriptionName=?2 " +
"and ss.destination=?3");
query.setParameter(1, clientId);
query.setParameter(2, subscriptionName);
query.setParameter(3, destinationName);
List<StoredSubscription> resultList = query.getResultList();
if( resultList.isEmpty() )
return null;
return resultList.get(0);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
SubscriptionInfo rc[];
EntityManager manager = adapter.beginEntityManager(null);
try {
ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
query.setParameter(1, destinationName);
for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(ss.getClientId());
info.setDestination(destination);
info.setSelector(ss.getSelector());
info.setSubcriptionName(ss.getSubscriptionName());
l.add(info);
}
rc = new SubscriptionInfo[l.size()];
l.toArray(rc);
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc;
}
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
Long rc;
EntityManager manager = adapter.beginEntityManager(null);
try {
Query query = manager.createQuery(
"select count(m) FROM StoredMessageReference m, StoredSubscription ss " +
"where ss.clientId=?1 " +
"and ss.subscriptionName=?2 " +
"and ss.destination=?3 " +
"and m.destination=ss.destination and m.id > ss.lastAckedId");
query.setParameter(1, clientId);
query.setParameter(2, subscriptionName);
query.setParameter(3, destinationName);
rc = (Long) query.getSingleResult();
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc.intValue();
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
SubscriptionInfo rc=null;
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
if( ss != null ) {
rc = new SubscriptionInfo();
rc.setClientId(ss.getClientId());
rc.setDestination(destination);
rc.setSelector(ss.getSelector());
rc.setSubcriptionName(ss.getSubscriptionName());
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
return rc;
}
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
EntityManager manager = adapter.beginEntityManager(null);
try {
SubscriptionId id = new SubscriptionId();
id.setClientId(clientId);
id.setSubscriptionName(subscriptionName);
id.setDestination(destinationName);
AtomicLong last=subscriberLastMessageMap.get(id);
if(last==null){
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
last=new AtomicLong(ss.getLastAckedId());
subscriberLastMessageMap.put(id,last);
}
final AtomicLong lastMessageId=last;
Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
query.setParameter(1, destinationName);
query.setParameter(2, lastMessageId.get());
query.setMaxResults(maxReturned);
int count = 0;
for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
MessageId mid = new MessageId(m.getMessageId());
mid.setBrokerSequenceId(m.getId());
listener.recoverMessageReference(mid);
lastMessageId.set(m.getId());
count++;
if( count >= maxReturned ) {
return;
}
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
query.setParameter(1, destinationName);
query.setParameter(2, ss.getLastAckedId());
for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
MessageId mid = new MessageId(m.getMessageId());
mid.setBrokerSequenceId(m.getId());
listener.recoverMessageReference(mid);
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
throw IOExceptionSupport.create(e);
}
adapter.commitEntityManager(null,manager);
}
public void resetBatching(String clientId, String subscriptionName) {
SubscriptionId id = new SubscriptionId();
id.setClientId(clientId);
id.setSubscriptionName(subscriptionName);
id.setDestination(destinationName);
subscriberLastMessageMap.remove(id);
}
}

View File

@ -0,0 +1,97 @@
/*
* Copyright 2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jpa.model;
import javax.persistence.Basic;
import javax.persistence.Entity;
import javax.persistence.Id;
import org.apache.openjpa.persistence.jdbc.Index;
/**
*/
@Entity()
public class StoredMessageReference {
@Id
private long id;
@Basic(optional=false)
@Index(enabled=true, unique=false)
private String messageId;
@Basic(optional=false)
@Index(enabled=true, unique=false)
private String destination;
@Basic
@Index(enabled=false, unique=false)
private long exiration;
@Basic(optional=false)
@Index(enabled=false, unique=false)
private int fileId;
@Basic(optional=false)
@Index(enabled=false, unique=false)
private int offset;
public StoredMessageReference() {
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public long getExiration() {
return exiration;
}
public void setExiration(long exiration) {
this.exiration = exiration;
}
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public long getId() {
return id;
}
public void setId(long sequenceId) {
this.id = sequenceId;
}
public int getFileId() {
return fileId;
}
public void setFileId(int fileId) {
this.fileId = fileId;
}
public int getOffset() {
return offset;
}
public void setOffset(int offset) {
this.offset = offset;
}
}

View File

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2006 The Apache Software Foundation.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
version="1.0">
<persistence-unit name="activemq" transaction-type="RESOURCE_LOCAL">
<provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
<class>org.apache.activemq.store.jpa.model.StoredMessage</class>
<class>org.apache.activemq.store.jpa.model.StoredSubscription</class>
<class>org.apache.activemq.store.jpa.model.StoredMessageReference</class>
</persistence-unit>
</persistence>

View File

@ -0,0 +1,48 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.store;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
/**
*
* @version $Revision$
*/
public class JPAStoreLoadTester extends LoadTester {
protected BrokerService createBroker() throws Exception {
BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/jpabroker.xml"));
brokerFactory.afterPropertiesSet();
BrokerService broker = brokerFactory.getBroker();
broker.setDeleteAllMessagesOnStartup(true);
return broker;
}
public static Test suite() {
return suite(JPAStoreLoadTester.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View File

@ -0,0 +1,48 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.store;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
/**
*
* @version $Revision$
*/
public class QuickJPAStoreLoadTester extends LoadTester {
protected BrokerService createBroker() throws Exception {
BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/quickjpabroker.xml"));
brokerFactory.afterPropertiesSet();
BrokerService broker = brokerFactory.getBroker();
broker.setDeleteAllMessagesOnStartup(true);
return broker;
}
public static Test suite() {
return suite(QuickJPAStoreLoadTester.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View File

@ -0,0 +1,79 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.store;
import java.util.Properties;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.RecoveryBrokerTest;
import org.apache.activemq.store.jpa.JPAReferenceStoreAdapter;
import org.apache.activemq.store.quick.QuickPersistenceAdapter;
/**
* Used to verify that recovery works correctly against
*
* @version $Revision$
*/
public class QuickJPAStoreRecoveryBrokerTest extends RecoveryBrokerTest {
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
service.setDeleteAllMessagesOnStartup(true);
QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter();
Properties props = new Properties();
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
service.setPersistenceAdapter(pa);
return service;
}
protected BrokerService createRestartedBroker() throws Exception {
BrokerService service = new BrokerService();
QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter();
Properties props = new Properties();
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
service.setPersistenceAdapter(pa);
return service;
}
public static Test suite() {
return suite(QuickJPAStoreRecoveryBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View File

@ -0,0 +1,79 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.store;
import java.util.Properties;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.XARecoveryBrokerTest;
import org.apache.activemq.store.jpa.JPAReferenceStoreAdapter;
import org.apache.activemq.store.quick.QuickPersistenceAdapter;
/**
* Used to verify that recovery works correctly against
*
* @version $Revision$
*/
public class QuickJPAStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
public static Test suite() {
return suite(QuickJPAStoreXARecoveryBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
service.setDeleteAllMessagesOnStartup(true);
QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter();
Properties props = new Properties();
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
service.setPersistenceAdapter(pa);
return service;
}
protected BrokerService createRestartedBroker() throws Exception {
BrokerService service = new BrokerService();
QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
JPAReferenceStoreAdapter rfa = new JPAReferenceStoreAdapter();
Properties props = new Properties();
props.setProperty("openjpa.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver");
props.setProperty("openjpa.ConnectionURL", "jdbc:derby:activemq-data/derby;create=true");
props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema");
props.setProperty("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE");
rfa.setEntityManagerProperties(props);
pa.setReferenceStoreAdapter(rfa);
service.setPersistenceAdapter(pa);
return service;
}
}

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker
brokerName="broker"
persistent="false" useJmx="false"
deleteAllMessagesOnStartup="true"
xmlns="http://activemq.org/config/1.0" persistenceAdapter="#jpa">
<transportConnectors>
<transportConnector uri="tcp://localhost:0"/>
</transportConnectors>
</broker>
<bean class="org.apache.activemq.store.jpa.JPAPersistenceAdapter" id="jpa">
<property name="entityManagerProperties">
<props>
<prop key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
<prop key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
<prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
<prop key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
</props>
</property>
</bean>
</beans>

View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker
brokerName="broker"
persistent="false" useJmx="false"
deleteAllMessagesOnStartup="true"
xmlns="http://activemq.org/config/1.0">
<transportConnectors>
<transportConnector uri="tcp://localhost:0"/>
</transportConnectors>
<persistenceAdapter>
<quickPersistenceAdapter directory="${basedir}/target/activemq-data/quick-broker.db" referenceStoreAdapter="#jpa"/>
</persistenceAdapter>
</broker>
<bean class="org.apache.activemq.store.jpa.JPAReferenceStoreAdapter" id="jpa">
<property name="entityManagerProperties">
<props>
<prop key="openjpa.ConnectionDriverName">org.apache.derby.jdbc.EmbeddedDriver</prop>
<prop key="openjpa.ConnectionURL">jdbc:derby:activemq-data/derby;create=true</prop>
<prop key="openjpa.jdbc.SynchronizeMappings">buildSchema</prop>
<prop key="openjpa.Log=DefaultLevel">WARN,SQL=TRACE</prop>
</props>
</property>
</bean>
</beans>