From ec9975c36eddfeb7eb3c6339aac72dc0f71b3d5d Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 31 May 2013 19:56:03 +0000 Subject: [PATCH] Additional fixes related to AMQ-4563: You can now configure the storeOpenWireVersion property of a broker to control which version of openwire is used by the persistence stores. This needs to be set to version 10 to preserve the original AMQP message ids. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1488375 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 11 ++++++++++ .../cursors/FilePendingMessageCursor.java | 3 +++ .../broker/scheduler/SchedulerBroker.java | 2 ++ .../store/jdbc/JDBCPersistenceAdapter.java | 5 +++++ .../journal/JournalPersistenceAdapter.java | 4 ++++ .../activemq/store/kahadb/KahaDBStore.java | 3 +++ .../store/kahadb/KahaDBTransactionStore.java | 9 ++++++--- .../store/kahadb/TempKahaDBStore.java | 20 +++++++++++++++++-- .../activemq/leveldb/LevelDBStore.scala | 4 +++- 9 files changed, 55 insertions(+), 6 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 98e67665b1..83c3ded432 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -92,6 +92,7 @@ import org.apache.activemq.network.ConnectionFilter; import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.jms.JmsConnector; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.proxy.ProxyConnector; import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.selector.SelectorParser; @@ -239,6 +240,8 @@ public class BrokerService implements Service { private boolean restartAllowed = true; private boolean restartRequested = false; + private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION; + static { try { @@ -2880,4 +2883,12 @@ public class BrokerService implements Service { public void requestRestart() { this.restartRequested = true; } + + public int getStoreOpenWireVersion() { + return storeOpenWireVersion; + } + + public void setStoreOpenWireVersion(int storeOpenWireVersion) { + this.storeOpenWireVersion = storeOpenWireVersion; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index b91e87230c..f85a04af6f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -83,6 +83,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple @Override public void start() throws Exception { if (started.compareAndSet(false, true)) { + if( this.broker != null) { + wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion()); + } super.start(); if (systemUsage != null) { systemUsage.getMemoryUsage().addUsageListener(this); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java index 8b59006aa0..5460c27716 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java @@ -65,6 +65,8 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); this.context.setBroker(next); this.systemUsage = brokerService.getSystemUsage(); + + wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); } public synchronized JobScheduler getJobScheduler() throws Exception { diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index edfc1a9b56..508fd55ada 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -304,6 +304,11 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements } public void doStart() throws Exception { + + if( brokerService!=null ) { + wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); + } + // Cleanup the db periodically. if (cleanupPeriod > 0) { cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() { diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index ca549ca174..4832472694 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -241,6 +241,10 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return; } + if( brokerService!=null ) { + wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); + } + checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { public boolean iterate() { return doCheckpoint(); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index f424ea2619..4a3ba4caac 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -177,6 +177,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public void doStart() throws Exception { + if( brokerService!=null ) { + wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); + } super.doStart(); this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java index af70570000..97a4bb5b6e 100755 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java @@ -62,13 +62,16 @@ import org.slf4j.LoggerFactory; public class KahaDBTransactionStore implements TransactionStore { static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); - private final WireFormat wireFormat = new OpenWireFormat(); private final KahaDBStore theStore; public KahaDBTransactionStore(KahaDBStore theStore) { this.theStore = theStore; } + private WireFormat wireFormat(){ + return this.theStore.wireFormat; + } + public class Tx { private final ArrayList messages = new ArrayList(); @@ -335,13 +338,13 @@ public class KahaDBTransactionStore implements TransactionStore { for (Operation op : entry.getValue()) { if (op.getClass() == AddOpperation.class) { AddOpperation addOp = (AddOpperation) op; - Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage() + Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage() .newInput())); messageList.add(msg); } else { RemoveOpperation rmOp = (RemoveOpperation) op; Buffer ackb = rmOp.getCommand().getAck(); - MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput())); + MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput())); ackList.add(ack); } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index cdf4fe4dda..b68972e1b7 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -24,6 +24,9 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.Map.Entry; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -60,9 +63,10 @@ import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; -public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter { +public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware { private final WireFormat wireFormat = new OpenWireFormat(); + private BrokerService brokerService; public void setBrokerName(String brokerName) { } @@ -575,5 +579,17 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA public long getLastProducerSequenceId(ProducerId id) { return -1; } - + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + @Override + public void load() throws IOException { + if( brokerService!=null ) { + wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); + } + super.load(); + } } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index d63a63d372..7ba400b5ca 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -206,7 +206,9 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P var snappyCompressLogs = false def doStart: Unit = { - + if( brokerService!=null ) { + wireFormat.setVersion(brokerService.getStoreOpenWireVersion) + } snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null debug("starting")