Additional fixes related to AMQ-4563: You can now configure the storeOpenWireVersion property of a broker to control which version of openwire is used by the persistence stores. This needs to be set to version 10 to preserve the original AMQP message ids.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1488375 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-31 19:56:03 +00:00
parent 34e8331829
commit ec9975c36e
9 changed files with 55 additions and 6 deletions

View File

@ -92,6 +92,7 @@ import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.jms.JmsConnector; import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.proxy.ProxyConnector; import org.apache.activemq.proxy.ProxyConnector;
import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
@ -239,6 +240,8 @@ public class BrokerService implements Service {
private boolean restartAllowed = true; private boolean restartAllowed = true;
private boolean restartRequested = false; private boolean restartRequested = false;
private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION;
static { static {
try { try {
@ -2880,4 +2883,12 @@ public class BrokerService implements Service {
public void requestRestart() { public void requestRestart() {
this.restartRequested = true; this.restartRequested = true;
} }
public int getStoreOpenWireVersion() {
return storeOpenWireVersion;
}
public void setStoreOpenWireVersion(int storeOpenWireVersion) {
this.storeOpenWireVersion = storeOpenWireVersion;
}
} }

View File

@ -83,6 +83,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
@Override @Override
public void start() throws Exception { public void start() throws Exception {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
if( this.broker != null) {
wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion());
}
super.start(); super.start();
if (systemUsage != null) { if (systemUsage != null) {
systemUsage.getMemoryUsage().addUsageListener(this); systemUsage.getMemoryUsage().addUsageListener(this);

View File

@ -65,6 +65,8 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
this.context.setBroker(next); this.context.setBroker(next);
this.systemUsage = brokerService.getSystemUsage(); this.systemUsage = brokerService.getSystemUsage();
wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
} }
public synchronized JobScheduler getJobScheduler() throws Exception { public synchronized JobScheduler getJobScheduler() throws Exception {

View File

@ -304,6 +304,11 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
} }
public void doStart() throws Exception { public void doStart() throws Exception {
if( brokerService!=null ) {
wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
}
// Cleanup the db periodically. // Cleanup the db periodically.
if (cleanupPeriod > 0) { if (cleanupPeriod > 0) {
cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() { cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {

View File

@ -241,6 +241,10 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return; return;
} }
if( brokerService!=null ) {
wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
}
checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
public boolean iterate() { public boolean iterate() {
return doCheckpoint(); return doCheckpoint();

View File

@ -177,6 +177,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override @Override
public void doStart() throws Exception { public void doStart() throws Exception {
if( brokerService!=null ) {
wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
}
super.doStart(); super.doStart();
this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());

View File

@ -62,13 +62,16 @@ import org.slf4j.LoggerFactory;
public class KahaDBTransactionStore implements TransactionStore { public class KahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
private final WireFormat wireFormat = new OpenWireFormat();
private final KahaDBStore theStore; private final KahaDBStore theStore;
public KahaDBTransactionStore(KahaDBStore theStore) { public KahaDBTransactionStore(KahaDBStore theStore) {
this.theStore = theStore; this.theStore = theStore;
} }
private WireFormat wireFormat(){
return this.theStore.wireFormat;
}
public class Tx { public class Tx {
private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
@ -335,13 +338,13 @@ public class KahaDBTransactionStore implements TransactionStore {
for (Operation op : entry.getValue()) { for (Operation op : entry.getValue()) {
if (op.getClass() == AddOpperation.class) { if (op.getClass() == AddOpperation.class) {
AddOpperation addOp = (AddOpperation) op; AddOpperation addOp = (AddOpperation) op;
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage() Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage()
.newInput())); .newInput()));
messageList.add(msg); messageList.add(msg);
} else { } else {
RemoveOpperation rmOp = (RemoveOpperation) op; RemoveOpperation rmOp = (RemoveOpperation) op;
Buffer ackb = rmOp.getCommand().getAck(); Buffer ackb = rmOp.getCommand().getAck();
MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput())); MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput()));
ackList.add(ack); ackList.add(ack);
} }
} }

View File

@ -24,6 +24,9 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
@ -60,9 +63,10 @@ import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.store.kahadb.disk.page.Transaction;
public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter { public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
private final WireFormat wireFormat = new OpenWireFormat(); private final WireFormat wireFormat = new OpenWireFormat();
private BrokerService brokerService;
public void setBrokerName(String brokerName) { public void setBrokerName(String brokerName) {
} }
@ -575,5 +579,17 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
public long getLastProducerSequenceId(ProducerId id) { public long getLastProducerSequenceId(ProducerId id) {
return -1; return -1;
} }
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
public void load() throws IOException {
if( brokerService!=null ) {
wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
}
super.load();
}
} }

View File

@ -206,7 +206,9 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
var snappyCompressLogs = false var snappyCompressLogs = false
def doStart: Unit = { def doStart: Unit = {
if( brokerService!=null ) {
wireFormat.setVersion(brokerService.getStoreOpenWireVersion)
}
snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null
debug("starting") debug("starting")