git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@392434 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-04-07 22:02:07 +00:00
parent 3c10ee0f52
commit 72923b8cbb
8 changed files with 476 additions and 21 deletions

View File

@ -38,6 +38,10 @@ public class KahaMessageStore implements MessageStore{
this.messageContainer=container;
this.destination=destination;
}
public Object getId(){
return messageContainer.getId();
}
public void addMessage(ConnectionContext context,Message message) throws IOException{
messageContainer.put(message.getMessageId().toString(),message);

View File

@ -18,7 +18,6 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -34,8 +33,6 @@ import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* @org.apache.xbean.XBean
@ -43,20 +40,21 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
* @version $Revision: 1.4 $
*/
public class KahaPersistentAdaptor implements PersistenceAdapter{
MemoryTransactionStore transactionStore;
static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
KahaTransactionStore transactionStore;
ConcurrentHashMap topics=new ConcurrentHashMap();
ConcurrentHashMap queues=new ConcurrentHashMap();
ConcurrentHashMap messageStores=new ConcurrentHashMap();
private boolean useExternalMessageReferences;
private WireFormat wireFormat = new OpenWireFormat();
private OpenWireFormat wireFormat=new OpenWireFormat();
Store store;
public KahaPersistentAdaptor(File dir) throws IOException{
if (!dir.exists()){
if(!dir.exists()){
dir.mkdirs();
}
String name = dir.getAbsolutePath() + File.separator + "kaha.db";
String name=dir.getAbsolutePath()+File.separator+"kaha.db";
store=StoreFactory.open(name,"rw");
}
public Set getDestinations(){
@ -74,6 +72,7 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{
MessageStore rc=(MessageStore) queues.get(destination);
if(rc==null){
rc=new KahaMessageStore(getMapContainer(destination),destination);
messageStores.put(destination, rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
}
@ -92,17 +91,28 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
ackContainer.load();
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination, rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
}
topics.put(destination,rc);
}
return rc;
}
protected MessageStore retrieveMessageStore(Object id){
MessageStore result = (MessageStore) messageStores.get(id);
return result;
}
public TransactionStore createTransactionStore() throws IOException{
if(transactionStore==null){
transactionStore=new MemoryTransactionStore();
MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME);
container.setKeyMarshaller(new CommandMarshaller(wireFormat));
container.setValueMarshaller(new TransactionMarshaller(wireFormat));
container.load();
transactionStore=new KahaTransactionStore(this,container);
}
return transactionStore;
}
@ -155,8 +165,8 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{
}
/**
* @param usageManager The UsageManager that is controlling the broker's memory usage.
* @param usageManager
* The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
}
public void setUsageManager(UsageManager usageManager){}
}

View File

@ -16,7 +16,6 @@ package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -55,7 +54,6 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
int subscriberCount=subscriberAcks.size();
if(subscriberCount>0){
super.addMessage(context,message);
String id=message.getMessageId().toString();
ackContainer.put(id,new AtomicInteger(subscriberCount));
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
@ -63,6 +61,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
ListContainer container=store.getListContainer(key);
container.add(id);
}
super.addMessage(context,message);
}
}
@ -79,7 +78,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
ackContainer.put(id,count);
}else{
// no more references to message messageContainer so remove it
container.remove(id);
super.removeMessage(messageId);
}
}
}

View File

@ -0,0 +1,100 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.MessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Stores a messages/acknowledgements for a transaction
*
* @version $Revision: 1.4 $
*/
class KahaTransaction{
private static final Log log=LogFactory.getLog(KahaTransaction.class);
protected List list=new ArrayList();
void add(KahaMessageStore store,BaseCommand command){
TxCommand tx=new TxCommand();
tx.setCommand(command);
tx.setMessageStoreKey(store.getId());
list.add(tx);
}
Message[] getMessages(){
List result=new ArrayList();
for(int i=0;i<list.size();i++){
TxCommand command=(TxCommand) list.get(i);
if(command.isAdd()){
result.add(command.getCommand());
}
}
Message[] messages=new Message[result.size()];
return (Message[]) result.toArray(messages);
}
MessageAck[] getAcks(){
List result=new ArrayList();
for(int i=0;i<list.size();i++){
TxCommand command=(TxCommand) list.get(i);
if(command.isRemove()){
result.add(command.getCommand());
}
}
MessageAck[] acks=new MessageAck[result.size()];
return (MessageAck[]) result.toArray(acks);
}
void prepare(){}
void rollback(){
list.clear();
}
/**
* @throws IOException
*/
void commit(KahaTransactionStore transactionStore) throws IOException{
for(int i=0;i<list.size();i++){
TxCommand command=(TxCommand) list.get(i);
MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey());
if(command.isAdd()){
ms.addMessage(null,(Message) command.getCommand());
}
}
for(int i=0;i<list.size();i++){
TxCommand command=(TxCommand) list.get(i);
MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey());
if(command.isRemove()){
ms.removeMessage(null,(MessageAck) command.getCommand());
}
}
}
List getList(){
return new ArrayList(list);
}
void setList(List list){
this.list = list;
}
}

View File

@ -0,0 +1,176 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* Provides a TransactionStore implementation that can create transaction aware MessageStore objects from non
* transaction aware MessageStore objects.
*
* @version $Revision: 1.4 $
*/
public class KahaTransactionStore implements TransactionStore{
private Map transactions=new ConcurrentHashMap();
private Map prepared;
private KahaPersistentAdaptor adaptor;
KahaTransactionStore(KahaPersistentAdaptor adaptor,Map preparedMap){
this.adaptor=adaptor;
this.prepared=preparedMap;
}
public MessageStore proxy(MessageStore messageStore){
return new ProxyMessageStore(messageStore){
public void addMessage(ConnectionContext context,final Message send) throws IOException{
KahaTransactionStore.this.addMessage(getDelegate(),send);
}
public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
KahaTransactionStore.this.removeMessage(getDelegate(),ack);
}
};
}
public TopicMessageStore proxy(TopicMessageStore messageStore){
return new ProxyTopicMessageStore(messageStore){
public void addMessage(ConnectionContext context,final Message send) throws IOException{
KahaTransactionStore.this.addMessage(getDelegate(),send);
}
public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException{
KahaTransactionStore.this.removeMessage(getDelegate(),ack);
}
};
}
/**
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid){
KahaTransaction tx=getTx(txid);
if(tx!=null){
tx.prepare();
prepared.put(txid,tx);
}
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
KahaTransaction tx=getTx(txid);
if(tx!=null){
tx.commit(this);
removeTx(txid);
}
}
/**
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid){
KahaTransaction tx=getTx(txid);
if(tx!=null){
tx.rollback();
removeTx(txid);
}
}
public void start() throws Exception{}
public void stop() throws Exception{}
synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
for(Iterator i=prepared.entrySet().iterator();i.hasNext();){
Map.Entry entry=(Entry) i.next();
XATransactionId xid=(XATransactionId) entry.getKey();
KahaTransaction kt=(KahaTransaction) entry.getValue();
listener.recover(xid,kt.getMessages(),kt.getAcks());
}
}
/**
* @param message
* @throws IOException
*/
void addMessage(final MessageStore destination,final Message message) throws IOException{
if(message.isInTransaction()){
KahaTransaction tx=getOrCreateTx(message.getTransactionId());
tx.add((KahaMessageStore) destination,message);
}else{
destination.addMessage(null,message);
}
}
/**
* @param ack
* @throws IOException
*/
private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{
if(ack.isInTransaction()){
KahaTransaction tx=getOrCreateTx(ack.getTransactionId());
tx.add((KahaMessageStore) destination,ack);
}else{
destination.removeMessage(null,ack);
}
}
protected synchronized KahaTransaction getTx(TransactionId key){
KahaTransaction result=(KahaTransaction) transactions.get(key);
if(result==null){
result=(KahaTransaction) prepared.get(key);
}
return result;
}
protected synchronized KahaTransaction getOrCreateTx(TransactionId key){
KahaTransaction result=(KahaTransaction) transactions.get(key);
if(result==null){
result=new KahaTransaction();
transactions.put(key,result);
}
return result;
}
protected synchronized void removeTx(TransactionId key){
transactions.remove(key);
prepared.remove(key);
}
public void delete(){
transactions.clear();
prepared.clear();
}
protected MessageStore getStoreById(Object id){
return adaptor.retrieveMessageStore(id);
}
}

View File

@ -0,0 +1,87 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.kaha.Marshaller;
/**
* Marshall a Transaction
* @version $Revision: 1.10 $
*/
public class TransactionMarshaller implements Marshaller{
private WireFormat wireFormat;
public TransactionMarshaller(WireFormat wireFormat){
this.wireFormat = wireFormat;
}
public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
KahaTransaction kt = (KahaTransaction) object;
List list = kt.getList();
dataOut.writeInt(list.size());
for (int i = 0; i < list.size(); i++){
TxCommand tx = (TxCommand) list.get(i);
Object key = tx.getMessageStoreKey();
Packet packet = wireFormat.marshal(key);
byte[] data = packet.sliceAsBytes();
dataOut.writeInt(data.length);
dataOut.write(data);
Object command = tx.getCommand();
packet = wireFormat.marshal(command);
data = packet.sliceAsBytes();
dataOut.writeInt(data.length);
dataOut.write(data);
}
}
public Object readPayload(DataInputStream dataIn) throws IOException{
KahaTransaction result = new KahaTransaction();
List list = new ArrayList();
result.setList(list);
int number=dataIn.readInt();
for (int i = 0; i < number; i++){
TxCommand command = new TxCommand();
int size = dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
Object key = wireFormat.unmarshal(new ByteArrayPacket(data));
command.setMessageStoreKey(key);
size = dataIn.readInt();
data=new byte[size];
dataIn.readFully(data);
BaseCommand bc = (BaseCommand) wireFormat.unmarshal(new ByteArrayPacket(data));
command.setCommand(bc);
list.add(command);
}
return result;
}
}

View File

@ -0,0 +1,76 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.CommandTypes;
/**
* Base class for messages/acknowledgements for a transaction
*
* @version $Revision: 1.4 $
*/
class TxCommand {
protected Object messageStoreKey;
protected BaseCommand command;
/**
* @return Returns the messageStoreKey.
*/
public Object getMessageStoreKey(){
return messageStoreKey;
}
/**
* @param messageStoreKey The messageStoreKey to set.
*/
public void setMessageStoreKey(Object messageStoreKey){
this.messageStoreKey=messageStoreKey;
}
/**
* @return Returns the command.
*/
public BaseCommand getCommand(){
return command;
}
/**
* @param command The command to set.
*/
public void setCommand(BaseCommand command){
this.command=command;
}
/**
* @return true if a Message command
*/
public boolean isAdd(){
return command != null && command.getDataStructureType() != CommandTypes.MESSAGE_ACK;
}
/**
* @return true if a MessageAck command
*/
public boolean isRemove(){
return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK;
}
}

View File

@ -16,10 +16,12 @@
*/
package org.apache.activemq.broker.store;
import java.io.File;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.XARecoveryBrokerTest;
import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
@ -39,17 +41,18 @@ public class KahaXARecoveryBrokerTest extends XARecoveryBrokerTest {
}
protected BrokerService createBroker() throws Exception {
BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/kahabroker.xml"));
brokerFactory.afterPropertiesSet();
BrokerService broker = brokerFactory.getBroker();
BrokerService broker = createRestartedBroker();
broker.setDeleteAllMessagesOnStartup(true);
return broker;
}
protected BrokerService createRestartedBroker() throws Exception {
BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/kahabroker.xml"));
brokerFactory.afterPropertiesSet();
return brokerFactory.getBroker();
BrokerService broker = new BrokerService();
KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File("activemq-data/storetest"));
broker.setPersistenceAdapter(adaptor);
broker.addConnector("tcp://localhost:0");
return broker;
}
}