diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java index aca5cbc5b6..a8415c9b7d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java @@ -19,13 +19,10 @@ package org.apache.activemq.store.amq; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; - import javax.transaction.xa.XAException; - import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.JournalTransaction; import org.apache.activemq.command.Message; @@ -39,92 +36,15 @@ import org.apache.activemq.store.TransactionStore; /** */ -public class AMQTransactionStore implements TransactionStore { +public class AMQTransactionStore implements TransactionStore{ private final AMQPersistenceAdapter peristenceAdapter; - Map inflightTransactions = new LinkedHashMap(); - Map preparedTransactions = new LinkedHashMap(); + Map inflightTransactions=new LinkedHashMap(); + Map preparedTransactions=new LinkedHashMap(); private boolean doingRecover; - - public static class TxOperation { - - static final byte ADD_OPERATION_TYPE = 0; - static final byte REMOVE_OPERATION_TYPE = 1; - static final byte ACK_OPERATION_TYPE = 3; - - public byte operationType; - public AMQMessageStore store; - public Object data; - public Location location; - - public TxOperation(byte operationType, AMQMessageStore store, Object data, Location location) { - this.operationType=operationType; - this.store=store; - this.data=data; - this.location=location; - } - - } - /** - * Operations - * @version $Revision: 1.6 $ - */ - public static class Tx { - - private final Location location; - private ArrayList operations = new ArrayList(); - - public Tx(Location location) { - this.location=location; - } - - public void add(AMQMessageStore store, Message msg, Location location) { - operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location)); - } - - public void add(AMQMessageStore store, MessageAck ack) { - operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null)); - } - - public void add(AMQTopicMessageStore store, JournalTopicAck ack) { - operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null)); - } - - public Message[] getMessages() { - ArrayList list = new ArrayList(); - for (Iterator iter = operations.iterator(); iter.hasNext();) { - TxOperation op = iter.next(); - if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) { - list.add(op.data); - } - } - Message rc[] = new Message[list.size()]; - list.toArray(rc); - return rc; - } - - public MessageAck[] getAcks() { - ArrayList list = new ArrayList(); - for (Iterator iter = operations.iterator(); iter.hasNext();) { - TxOperation op = iter.next(); - if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) { - list.add(op.data); - } - } - MessageAck rc[] = new MessageAck[list.size()]; - list.toArray(rc); - return rc; - } - - public ArrayList getOperations() { - return operations; - } - - } - - public AMQTransactionStore(AMQPersistenceAdapter adapter) { - this.peristenceAdapter = adapter; + public AMQTransactionStore(AMQPersistenceAdapter adapter){ + this.peristenceAdapter=adapter; } /** @@ -132,7 +52,7 @@ public class AMQTransactionStore implements TransactionStore { * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) */ public void prepare(TransactionId txid) throws IOException{ - Tx tx=null; + AMQTx tx=null; synchronized(inflightTransactions){ tx=inflightTransactions.remove(txid); } @@ -143,13 +63,13 @@ public class AMQTransactionStore implements TransactionStore { preparedTransactions.put(txid,tx); } } - + /** * @throws IOException * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) */ public void replayPrepare(TransactionId txid) throws IOException{ - Tx tx=null; + AMQTx tx=null; synchronized(inflightTransactions){ tx=inflightTransactions.remove(txid); } @@ -160,13 +80,13 @@ public class AMQTransactionStore implements TransactionStore { } } - public Tx getTx(TransactionId txid,Location location){ - Tx tx=null; + public AMQTx getTx(TransactionId txid,Location location){ + AMQTx tx=null; synchronized(inflightTransactions){ tx=inflightTransactions.get(txid); } if(tx==null){ - tx=new Tx(location); + tx=new AMQTx(location); inflightTransactions.put(txid,tx); } return tx; @@ -177,7 +97,7 @@ public class AMQTransactionStore implements TransactionStore { * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) */ public void commit(TransactionId txid,boolean wasPrepared) throws IOException{ - Tx tx; + AMQTx tx; if(wasPrepared){ synchronized(preparedTransactions){ tx=preparedTransactions.remove(txid); @@ -201,7 +121,7 @@ public class AMQTransactionStore implements TransactionStore { * @throws XAException * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) */ - public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{ + public AMQTx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{ if(wasPrepared){ synchronized(preparedTransactions){ return preparedTransactions.remove(txid); @@ -218,7 +138,7 @@ public class AMQTransactionStore implements TransactionStore { * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) */ public void rollback(TransactionId txid) throws IOException{ - Tx tx=null; + AMQTx tx=null; synchronized(inflightTransactions){ tx=inflightTransactions.remove(txid); } @@ -251,13 +171,13 @@ public class AMQTransactionStore implements TransactionStore { } } } - - public void start() throws Exception { + + public void start() throws Exception{ } - public void stop() throws Exception { + public void stop() throws Exception{ } - + synchronized public void recover(TransactionRecoveryListener listener) throws IOException{ // All the in-flight transactions get rolled back.. synchronized(inflightTransactions){ @@ -265,13 +185,13 @@ public class AMQTransactionStore implements TransactionStore { } this.doingRecover=true; try{ - Map txs=null; + Map txs=null; synchronized(preparedTransactions){ - txs=new LinkedHashMap(preparedTransactions); + txs=new LinkedHashMap(preparedTransactions); } for(Iterator iter=txs.keySet().iterator();iter.hasNext();){ Object txid=iter.next(); - Tx tx=txs.get(txid); + AMQTx tx=txs.get(txid); listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks()); } }finally{ @@ -283,26 +203,24 @@ public class AMQTransactionStore implements TransactionStore { * @param message * @throws IOException */ - void addMessage(AMQMessageStore store, Message message, Location location) throws IOException { - Tx tx = getTx(message.getTransactionId(), location); - tx.add(store, message, location); + void addMessage(AMQMessageStore store,Message message,Location location) throws IOException{ + AMQTx tx=getTx(message.getTransactionId(),location); + tx.add(store,message,location); } /** * @param ack * @throws IOException */ - public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException { - Tx tx = getTx(ack.getTransactionId(), location); - tx.add(store, ack); - } - - - public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) { - Tx tx = getTx(ack.getTransactionId(), location); - tx.add(store, ack); + public void removeMessage(AMQMessageStore store,MessageAck ack,Location location) throws IOException{ + AMQTx tx=getTx(ack.getTransactionId(),location); + tx.add(store,ack); } + public void acknowledge(AMQTopicMessageStore store,JournalTopicAck ack,Location location){ + AMQTx tx=getTx(ack.getTransactionId(),location); + tx.add(store,ack); + } public Location checkpoint() throws IOException{ // Nothing really to checkpoint.. since, we don't @@ -312,18 +230,18 @@ public class AMQTransactionStore implements TransactionStore { // roll over active tx records. Location rc=null; synchronized(inflightTransactions){ - for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){ - Tx tx=iter.next(); - Location location=tx.location; + for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){ + AMQTx tx=iter.next(); + Location location=tx.getLocation(); if(rc==null||rc.compareTo(location)<0){ rc=location; } } } synchronized(preparedTransactions){ - for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){ - Tx tx=iter.next(); - Location location=tx.location; + for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){ + AMQTx tx=iter.next(); + Location location=tx.getLocation(); if(rc==null||rc.compareTo(location)<0){ rc=location; } @@ -332,9 +250,24 @@ public class AMQTransactionStore implements TransactionStore { } } - public boolean isDoingRecover() { + public boolean isDoingRecover(){ return doingRecover; } + /** + * @return the preparedTransactions + */ + public Map getPreparedTransactions(){ + return this.preparedTransactions; + } + /** + * @param preparedTransactions the preparedTransactions to set + */ + public void setPreparedTransactions(Map preparedTransactions){ + if(preparedTransactions!=null){ + this.preparedTransactions.clear(); + this.preparedTransactions.putAll(preparedTransactions); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java index e6143e9dd0..5be79065bf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java @@ -15,10 +15,84 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.activemq.store.amq; +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.activemq.command.JournalTopicAck; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.kaha.impl.async.Location; + + /** - * @version $Revision: 1.1 $ */ -public interface AMQTx { +/** + * Operations + * @version $Revision: 1.6 $ + */ +public class AMQTx{ + + private final Location location; + private ArrayList operations=new ArrayList(); + + public AMQTx(Location location){ + this.location=location; + } + + public void add(AMQMessageStore store,Message msg,Location location){ + operations.add(new AMQTxOperation(AMQTxOperation.ADD_OPERATION_TYPE,store.getDestination(),msg,location)); + } + + public void add(AMQMessageStore store,MessageAck ack){ + operations.add(new AMQTxOperation(AMQTxOperation.REMOVE_OPERATION_TYPE,store.getDestination(),ack,null)); + } + + public void add(AMQTopicMessageStore store,JournalTopicAck ack){ + operations.add(new AMQTxOperation(AMQTxOperation.ACK_OPERATION_TYPE,store.getDestination(),ack,null)); + } + + public Message[] getMessages(){ + ArrayList list=new ArrayList(); + for(Iterator iter=operations.iterator();iter.hasNext();){ + AMQTxOperation op=iter.next(); + if(op.getOperationType()==AMQTxOperation.ADD_OPERATION_TYPE){ + list.add(op.getData()); + } + } + Message rc[]=new Message[list.size()]; + list.toArray(rc); + return rc; + } + + public MessageAck[] getAcks(){ + ArrayList list=new ArrayList(); + for(Iterator iter=operations.iterator();iter.hasNext();){ + AMQTxOperation op=iter.next(); + if(op.getOperationType()==AMQTxOperation.REMOVE_OPERATION_TYPE){ + list.add(op.getData()); + } + } + MessageAck rc[]=new MessageAck[list.size()]; + list.toArray(rc); + return rc; + } + + /** + * @return the location + */ + public Location getLocation(){ + return this.location; + } + + public ArrayList getOperations(){ + return operations; + } + + public void setOperations(ArrayList operations){ + this.operations=operations; + } } + + \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java new file mode 100644 index 0000000000..637679a992 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java @@ -0,0 +1,137 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.amq; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.JournalTopicAck; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.kaha.impl.async.Location; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.wireformat.WireFormat; + + +/** + */ +public class AMQTxOperation { + + public static final byte ADD_OPERATION_TYPE=0; + public static final byte REMOVE_OPERATION_TYPE=1; + public static final byte ACK_OPERATION_TYPE=3; + private byte operationType; + private ActiveMQDestination destination; + private Object data; + private Location location; + + public AMQTxOperation() { + } + + public AMQTxOperation(byte operationType,ActiveMQDestination destination,Object data,Location location){ + this.operationType=operationType; + this.destination=destination; + this.data=data; + this.location=location; + + } + + /** + * @return the data + */ + public Object getData(){ + return this.data; + } + + /** + * @param data the data to set + */ + public void setData(Object data){ + this.data=data; + } + + /** + * @return the location + */ + public Location getLocation(){ + return this.location; + } + + /** + * @param location the location to set + */ + public void setLocation(Location location){ + this.location=location; + } + + /** + * @return the operationType + */ + public byte getOperationType(){ + return this.operationType; + } + + /** + * @param operationType the operationType to set + */ + public void setOperationType(byte operationType){ + this.operationType=operationType; + } + + + public boolean replay(AMQPersistenceAdapter adapter,ConnectionContext context) throws IOException{ + boolean result=false; + AMQMessageStore store=(AMQMessageStore)adapter.createMessageStore(destination); + if(operationType==ADD_OPERATION_TYPE){ + result=store.replayAddMessage(context,(Message)data,location); + }else if(operationType==REMOVE_OPERATION_TYPE){ + result=store.replayRemoveMessage(context,(MessageAck)data); + }else{ + JournalTopicAck ack=(JournalTopicAck)data; + result=((AMQTopicMessageStore)store).replayAcknowledge(context,ack.getClientId(),ack.getSubscritionName(), + ack.getMessageId()); + } + return result; + } + + public void writeExternal(WireFormat wireFormat,DataOutput dos) throws IOException { + location.writeExternal(dos); + ByteSequence packet = wireFormat.marshal(getData()); + dos.writeInt(packet.length); + dos.write(packet.data, packet.offset, packet.length); + packet = wireFormat.marshal(destination); + dos.writeInt(packet.length); + dos.write(packet.data, packet.offset, packet.length); + } + + public void readExternal(WireFormat wireFormat,DataInput dis) throws IOException { + this.location=new Location(); + this.location.readExternal(dis); + int size=dis.readInt(); + byte[] data=new byte[size]; + dis.readFully(data); + setData(wireFormat.unmarshal(new ByteSequence(data))); + size=dis.readInt(); + data=new byte[size]; + dis.readFully(data); + this.destination=(ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data)); + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java new file mode 100644 index 0000000000..5958ea8533 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java @@ -0,0 +1,63 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadaptor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.kaha.impl.async.Location; +import org.apache.activemq.store.amq.AMQTx; +import org.apache.activemq.store.amq.AMQTxOperation; +import org.apache.activemq.wireformat.WireFormat; + +/** + * Marshall an AMQTx + * @version $Revision: 1.10 $ + */ +public class AMQTxMarshaller implements Marshaller{ + + private WireFormat wireFormat; + + public AMQTxMarshaller(WireFormat wireFormat){ + this.wireFormat=wireFormat; + } + + public AMQTx readPayload(DataInput dataIn) throws IOException{ + Location location=new Location(); + location.readExternal(dataIn); + AMQTx result=new AMQTx(location); + int size=dataIn.readInt(); + for(int i=0;i list=amqtx.getOperations(); + dataOut.writeInt(list.size()); + for(AMQTxOperation op:list){ + op.writeExternal(wireFormat,dataOut); + } + } +}