Split out Transaction class from AMQTrandactionStore -

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@557389 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-07-18 20:32:45 +00:00
parent 3d1fca08fe
commit 2438ada6ad
4 changed files with 329 additions and 122 deletions

View File

@ -19,13 +19,10 @@
package org.apache.activemq.store.amq;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.transaction.xa.XAException;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
@ -39,92 +36,15 @@ import org.apache.activemq.store.TransactionStore;
/**
*/
public class AMQTransactionStore implements TransactionStore {
public class AMQTransactionStore implements TransactionStore{
private final AMQPersistenceAdapter peristenceAdapter;
Map<TransactionId, Tx> inflightTransactions = new LinkedHashMap<TransactionId, Tx>();
Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
Map<TransactionId,AMQTx> inflightTransactions=new LinkedHashMap<TransactionId,AMQTx>();
Map<TransactionId,AMQTx> preparedTransactions=new LinkedHashMap<TransactionId,AMQTx>();
private boolean doingRecover;
public static class TxOperation {
static final byte ADD_OPERATION_TYPE = 0;
static final byte REMOVE_OPERATION_TYPE = 1;
static final byte ACK_OPERATION_TYPE = 3;
public byte operationType;
public AMQMessageStore store;
public Object data;
public Location location;
public TxOperation(byte operationType, AMQMessageStore store, Object data, Location location) {
this.operationType=operationType;
this.store=store;
this.data=data;
this.location=location;
}
}
/**
* Operations
* @version $Revision: 1.6 $
*/
public static class Tx {
private final Location location;
private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
public Tx(Location location) {
this.location=location;
}
public void add(AMQMessageStore store, Message msg, Location location) {
operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location));
}
public void add(AMQMessageStore store, MessageAck ack) {
operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null));
}
public void add(AMQTopicMessageStore store, JournalTopicAck ack) {
operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null));
}
public Message[] getMessages() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
list.add(op.data);
}
}
Message rc[] = new Message[list.size()];
list.toArray(rc);
return rc;
}
public MessageAck[] getAcks() {
ArrayList<Object> list = new ArrayList<Object>();
for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
TxOperation op = iter.next();
if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
list.add(op.data);
}
}
MessageAck rc[] = new MessageAck[list.size()];
list.toArray(rc);
return rc;
}
public ArrayList<TxOperation> getOperations() {
return operations;
}
}
public AMQTransactionStore(AMQPersistenceAdapter adapter) {
this.peristenceAdapter = adapter;
public AMQTransactionStore(AMQPersistenceAdapter adapter){
this.peristenceAdapter=adapter;
}
/**
@ -132,7 +52,7 @@ public class AMQTransactionStore implements TransactionStore {
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException{
Tx tx=null;
AMQTx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
@ -143,13 +63,13 @@ public class AMQTransactionStore implements TransactionStore {
preparedTransactions.put(txid,tx);
}
}
/**
* @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void replayPrepare(TransactionId txid) throws IOException{
Tx tx=null;
AMQTx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
@ -160,13 +80,13 @@ public class AMQTransactionStore implements TransactionStore {
}
}
public Tx getTx(TransactionId txid,Location location){
Tx tx=null;
public AMQTx getTx(TransactionId txid,Location location){
AMQTx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.get(txid);
}
if(tx==null){
tx=new Tx(location);
tx=new AMQTx(location);
inflightTransactions.put(txid,tx);
}
return tx;
@ -177,7 +97,7 @@ public class AMQTransactionStore implements TransactionStore {
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
Tx tx;
AMQTx tx;
if(wasPrepared){
synchronized(preparedTransactions){
tx=preparedTransactions.remove(txid);
@ -201,7 +121,7 @@ public class AMQTransactionStore implements TransactionStore {
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
public AMQTx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
if(wasPrepared){
synchronized(preparedTransactions){
return preparedTransactions.remove(txid);
@ -218,7 +138,7 @@ public class AMQTransactionStore implements TransactionStore {
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException{
Tx tx=null;
AMQTx tx=null;
synchronized(inflightTransactions){
tx=inflightTransactions.remove(txid);
}
@ -251,13 +171,13 @@ public class AMQTransactionStore implements TransactionStore {
}
}
}
public void start() throws Exception {
public void start() throws Exception{
}
public void stop() throws Exception {
public void stop() throws Exception{
}
synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
// All the in-flight transactions get rolled back..
synchronized(inflightTransactions){
@ -265,13 +185,13 @@ public class AMQTransactionStore implements TransactionStore {
}
this.doingRecover=true;
try{
Map<TransactionId, Tx> txs=null;
Map<TransactionId,AMQTx> txs=null;
synchronized(preparedTransactions){
txs=new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
txs=new LinkedHashMap<TransactionId,AMQTx>(preparedTransactions);
}
for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
Object txid=iter.next();
Tx tx=txs.get(txid);
AMQTx tx=txs.get(txid);
listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
}
}finally{
@ -283,26 +203,24 @@ public class AMQTransactionStore implements TransactionStore {
* @param message
* @throws IOException
*/
void addMessage(AMQMessageStore store, Message message, Location location) throws IOException {
Tx tx = getTx(message.getTransactionId(), location);
tx.add(store, message, location);
void addMessage(AMQMessageStore store,Message message,Location location) throws IOException{
AMQTx tx=getTx(message.getTransactionId(),location);
tx.add(store,message,location);
}
/**
* @param ack
* @throws IOException
*/
public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
}
public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
Tx tx = getTx(ack.getTransactionId(), location);
tx.add(store, ack);
public void removeMessage(AMQMessageStore store,MessageAck ack,Location location) throws IOException{
AMQTx tx=getTx(ack.getTransactionId(),location);
tx.add(store,ack);
}
public void acknowledge(AMQTopicMessageStore store,JournalTopicAck ack,Location location){
AMQTx tx=getTx(ack.getTransactionId(),location);
tx.add(store,ack);
}
public Location checkpoint() throws IOException{
// Nothing really to checkpoint.. since, we don't
@ -312,18 +230,18 @@ public class AMQTransactionStore implements TransactionStore {
// roll over active tx records.
Location rc=null;
synchronized(inflightTransactions){
for(Iterator<Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){
Tx tx=iter.next();
Location location=tx.location;
for(Iterator<AMQTx> iter=inflightTransactions.values().iterator();iter.hasNext();){
AMQTx tx=iter.next();
Location location=tx.getLocation();
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
}
}
synchronized(preparedTransactions){
for(Iterator<Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){
Tx tx=iter.next();
Location location=tx.location;
for(Iterator<AMQTx> iter=preparedTransactions.values().iterator();iter.hasNext();){
AMQTx tx=iter.next();
Location location=tx.getLocation();
if(rc==null||rc.compareTo(location)<0){
rc=location;
}
@ -332,9 +250,24 @@ public class AMQTransactionStore implements TransactionStore {
}
}
public boolean isDoingRecover() {
public boolean isDoingRecover(){
return doingRecover;
}
/**
* @return the preparedTransactions
*/
public Map<TransactionId,AMQTx> getPreparedTransactions(){
return this.preparedTransactions;
}
/**
* @param preparedTransactions the preparedTransactions to set
*/
public void setPreparedTransactions(Map<TransactionId,AMQTx> preparedTransactions){
if(preparedTransactions!=null){
this.preparedTransactions.clear();
this.preparedTransactions.putAll(preparedTransactions);
}
}
}

View File

@ -15,10 +15,84 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.impl.async.Location;
/**
* @version $Revision: 1.1 $
*/
public interface AMQTx {
/**
* Operations
* @version $Revision: 1.6 $
*/
public class AMQTx{
private final Location location;
private ArrayList<AMQTxOperation> operations=new ArrayList<AMQTxOperation>();
public AMQTx(Location location){
this.location=location;
}
public void add(AMQMessageStore store,Message msg,Location location){
operations.add(new AMQTxOperation(AMQTxOperation.ADD_OPERATION_TYPE,store.getDestination(),msg,location));
}
public void add(AMQMessageStore store,MessageAck ack){
operations.add(new AMQTxOperation(AMQTxOperation.REMOVE_OPERATION_TYPE,store.getDestination(),ack,null));
}
public void add(AMQTopicMessageStore store,JournalTopicAck ack){
operations.add(new AMQTxOperation(AMQTxOperation.ACK_OPERATION_TYPE,store.getDestination(),ack,null));
}
public Message[] getMessages(){
ArrayList<Object> list=new ArrayList<Object>();
for(Iterator<AMQTxOperation> iter=operations.iterator();iter.hasNext();){
AMQTxOperation op=iter.next();
if(op.getOperationType()==AMQTxOperation.ADD_OPERATION_TYPE){
list.add(op.getData());
}
}
Message rc[]=new Message[list.size()];
list.toArray(rc);
return rc;
}
public MessageAck[] getAcks(){
ArrayList<Object> list=new ArrayList<Object>();
for(Iterator<AMQTxOperation> iter=operations.iterator();iter.hasNext();){
AMQTxOperation op=iter.next();
if(op.getOperationType()==AMQTxOperation.REMOVE_OPERATION_TYPE){
list.add(op.getData());
}
}
MessageAck rc[]=new MessageAck[list.size()];
list.toArray(rc);
return rc;
}
/**
* @return the location
*/
public Location getLocation(){
return this.location;
}
public ArrayList<AMQTxOperation> getOperations(){
return operations;
}
public void setOperations(ArrayList<AMQTxOperation> operations){
this.operations=operations;
}
}

View File

@ -0,0 +1,137 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
*/
public class AMQTxOperation {
public static final byte ADD_OPERATION_TYPE=0;
public static final byte REMOVE_OPERATION_TYPE=1;
public static final byte ACK_OPERATION_TYPE=3;
private byte operationType;
private ActiveMQDestination destination;
private Object data;
private Location location;
public AMQTxOperation() {
}
public AMQTxOperation(byte operationType,ActiveMQDestination destination,Object data,Location location){
this.operationType=operationType;
this.destination=destination;
this.data=data;
this.location=location;
}
/**
* @return the data
*/
public Object getData(){
return this.data;
}
/**
* @param data the data to set
*/
public void setData(Object data){
this.data=data;
}
/**
* @return the location
*/
public Location getLocation(){
return this.location;
}
/**
* @param location the location to set
*/
public void setLocation(Location location){
this.location=location;
}
/**
* @return the operationType
*/
public byte getOperationType(){
return this.operationType;
}
/**
* @param operationType the operationType to set
*/
public void setOperationType(byte operationType){
this.operationType=operationType;
}
public boolean replay(AMQPersistenceAdapter adapter,ConnectionContext context) throws IOException{
boolean result=false;
AMQMessageStore store=(AMQMessageStore)adapter.createMessageStore(destination);
if(operationType==ADD_OPERATION_TYPE){
result=store.replayAddMessage(context,(Message)data,location);
}else if(operationType==REMOVE_OPERATION_TYPE){
result=store.replayRemoveMessage(context,(MessageAck)data);
}else{
JournalTopicAck ack=(JournalTopicAck)data;
result=((AMQTopicMessageStore)store).replayAcknowledge(context,ack.getClientId(),ack.getSubscritionName(),
ack.getMessageId());
}
return result;
}
public void writeExternal(WireFormat wireFormat,DataOutput dos) throws IOException {
location.writeExternal(dos);
ByteSequence packet = wireFormat.marshal(getData());
dos.writeInt(packet.length);
dos.write(packet.data, packet.offset, packet.length);
packet = wireFormat.marshal(destination);
dos.writeInt(packet.length);
dos.write(packet.data, packet.offset, packet.length);
}
public void readExternal(WireFormat wireFormat,DataInput dis) throws IOException {
this.location=new Location();
this.location.readExternal(dis);
int size=dis.readInt();
byte[] data=new byte[size];
dis.readFully(data);
setData(wireFormat.unmarshal(new ByteSequence(data)));
size=dis.readInt();
data=new byte[size];
dis.readFully(data);
this.destination=(ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data));
}
}

View File

@ -0,0 +1,63 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.amq.AMQTx;
import org.apache.activemq.store.amq.AMQTxOperation;
import org.apache.activemq.wireformat.WireFormat;
/**
* Marshall an AMQTx
* @version $Revision: 1.10 $
*/
public class AMQTxMarshaller implements Marshaller<AMQTx>{
private WireFormat wireFormat;
public AMQTxMarshaller(WireFormat wireFormat){
this.wireFormat=wireFormat;
}
public AMQTx readPayload(DataInput dataIn) throws IOException{
Location location=new Location();
location.readExternal(dataIn);
AMQTx result=new AMQTx(location);
int size=dataIn.readInt();
for(int i=0;i<size;i++){
AMQTxOperation op=new AMQTxOperation();
op.readExternal(wireFormat,dataIn);
result.getOperations().add(op);
}
return result;
}
public void writePayload(AMQTx amqtx,DataOutput dataOut) throws IOException{
amqtx.getLocation().writeExternal(dataOut);
List<AMQTxOperation> list=amqtx.getOperations();
dataOut.writeInt(list.size());
for(AMQTxOperation op:list){
op.writeExternal(wireFormat,dataOut);
}
}
}