From 13044deccea1e1bd6849dbe0babb6720aed634ac Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 8 Jul 2015 17:29:32 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5848 Use the latest openwire version marshallers in the KahaDB store when starting from a clean install, drop back to the version used in the existing store if one is found. --- .../activemq/transport/amqp/AMQ4563Test.java | 5 - .../transport/amqp/AmqpTestSupport.java | 6 - .../apache/activemq/broker/BrokerService.java | 2 +- .../apache/activemq/command/CommandTypes.java | 4 +- .../activemq/openwire/OpenWireFormat.java | 44 +++-- .../activemq/store/kahadb/KahaDBStore.java | 10 +- .../store/kahadb/MessageDatabase.java | 4 +- .../activemq/store/kahadb/AMQ5626Test.java | 63 ++---- ...JournalCorruptionEofIndexRecoveryTest.java | 43 ++-- .../JournalCorruptionIndexRecoveryTest.java | 42 ++-- .../KahaDBStoreOpenWireVersionTest.java | 187 ++++++++++++++++++ .../transport/tcp/InactivityMonitorTest.java | 29 ++- 12 files changed, 298 insertions(+), 141 deletions(-) create mode 100644 activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java index 099944d0d5..6520caa3c7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java @@ -222,9 +222,4 @@ public class AMQ4563Test extends AmqpTestSupport { protected boolean isPersistent() { return true; } - - @Override - protected int getStoreOpenWireVersion() { - return 10; - } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index 615376290a..91909d4498 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -45,7 +45,6 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; -import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.spring.SpringSslContext; import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.transport.amqp.protocol.AmqpConnection; @@ -102,7 +101,6 @@ public class AmqpTestSupport { KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName())); brokerService.setPersistenceAdapter(kaha); - brokerService.setStoreOpenWireVersion(getStoreOpenWireVersion()); } brokerService.setSchedulerSupport(false); brokerService.setAdvisorySupport(false); @@ -188,10 +186,6 @@ public class AmqpTestSupport { return true; } - protected int getStoreOpenWireVersion() { - return OpenWireFormat.DEFAULT_WIRE_VERSION; - } - protected boolean isUseOpenWireConnector() { return false; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 8aa6f9520a..0290a76eb5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -251,7 +251,7 @@ public class BrokerService implements Service { private boolean restartRequested = false; private boolean rejectDurableConsumers = false; - private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION; + private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; static { diff --git a/activemq-client/src/main/java/org/apache/activemq/command/CommandTypes.java b/activemq-client/src/main/java/org/apache/activemq/command/CommandTypes.java index 49ed9cbd1a..e4bf463de2 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/CommandTypes.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/CommandTypes.java @@ -27,8 +27,10 @@ public interface CommandTypes { byte PROTOCOL_VERSION = 11; // What is the latest version of the openwire protocol used in the stores - byte PROTOCOL_STORE_VERSION = 6; + byte PROTOCOL_STORE_VERSION = 11; + // What is the legacy version that old KahaDB store's most commonly used + byte PROTOCOL_LEGACY_STORE_VERSION = 6; // A marshaling layer can use this type to specify a null object. byte NULL = 0; diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java index e605a4eb2e..f70bff840c 100755 --- a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java +++ b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java @@ -33,13 +33,14 @@ import org.apache.activemq.util.DataByteArrayOutputStream; import org.apache.activemq.wireformat.WireFormat; /** - * - * + * + * */ public final class OpenWireFormat implements WireFormat { - public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; + public static final int DEFAULT_STORE_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION; + public static final int DEFAULT_LEGACY_VERSION = CommandTypes.PROTOCOL_LEGACY_STORE_VERSION; public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; static final byte NULL_TYPE = CommandTypes.NULL; @@ -64,15 +65,16 @@ public final class OpenWireFormat implements WireFormat { private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); private WireFormatInfo preferedWireFormatInfo; - + public OpenWireFormat() { - this(DEFAULT_VERSION); + this(DEFAULT_STORE_VERSION); } public OpenWireFormat(int i) { setVersion(i); } + @Override public int hashCode() { return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) @@ -91,6 +93,7 @@ public final class OpenWireFormat implements WireFormat { return answer; } + @Override public boolean equals(Object object) { if (object == null) { return false; @@ -102,6 +105,7 @@ public final class OpenWireFormat implements WireFormat { } + @Override public String toString() { return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}"; @@ -109,10 +113,12 @@ public final class OpenWireFormat implements WireFormat { // tightEncodingEnabled="+tightEncodingEnabled+"}"; } + @Override public int getVersion() { return version; } + @Override public synchronized ByteSequence marshal(Object command) throws IOException { if (cacheEnabled) { @@ -125,7 +131,7 @@ public final class OpenWireFormat implements WireFormat { DataStructure c = (DataStructure)command; byte type = c.getDataStructureType(); - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + type); } @@ -173,6 +179,7 @@ public final class OpenWireFormat implements WireFormat { return sequence; } + @Override public synchronized Object unmarshal(ByteSequence sequence) throws IOException { bytesIn.restart(sequence); // DataInputStream dis = new DataInputStream(new @@ -197,6 +204,7 @@ public final class OpenWireFormat implements WireFormat { return command; } + @Override public synchronized void marshal(Object o, DataOutput dataOut) throws IOException { if (cacheEnabled) { @@ -208,7 +216,7 @@ public final class OpenWireFormat implements WireFormat { DataStructure c = (DataStructure)o; byte type = c.getDataStructureType(); - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + type); } @@ -246,12 +254,13 @@ public final class OpenWireFormat implements WireFormat { } else { if (!sizePrefixDisabled) { - dataOut.writeInt(size); + dataOut.writeInt(size); } dataOut.writeByte(NULL_TYPE); } } + @Override public Object unmarshal(DataInput dis) throws IOException { DataInput dataIn = dis; if (!sizePrefixDisabled) { @@ -276,7 +285,7 @@ public final class OpenWireFormat implements WireFormat { if (o != null) { DataStructure c = (DataStructure)o; byte type = c.getDataStructureType(); - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + type); } @@ -299,7 +308,7 @@ public final class OpenWireFormat implements WireFormat { if (o != null) { DataStructure c = (DataStructure)o; byte type = c.getDataStructureType(); - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + type); } @@ -312,9 +321,10 @@ public final class OpenWireFormat implements WireFormat { /** * Allows you to dynamically switch the version of the openwire protocol * being used. - * + * * @param version */ + @Override public void setVersion(int version) { String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory"; Class mfClass; @@ -343,7 +353,7 @@ public final class OpenWireFormat implements WireFormat { public Object doUnmarshal(DataInput dis) throws IOException { byte dataType = dis.readByte(); if (dataType != NULL_TYPE) { - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + dataType); } @@ -382,7 +392,7 @@ public final class OpenWireFormat implements WireFormat { } byte type = o.getDataStructureType(); - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + type); } @@ -409,7 +419,7 @@ public final class OpenWireFormat implements WireFormat { } else { - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + type); } @@ -422,7 +432,7 @@ public final class OpenWireFormat implements WireFormat { if (bs.readBoolean()) { byte dataType = dis.readByte(); - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + dataType); } @@ -455,7 +465,7 @@ public final class OpenWireFormat implements WireFormat { if (dis.readBoolean()) { byte dataType = dis.readByte(); - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + dataType); } @@ -473,7 +483,7 @@ public final class OpenWireFormat implements WireFormat { if (o != null) { byte type = o.getDataStructureType(); dataOut.writeByte(type); - DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; + DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; if (dsm == null) { throw new IOException("Unknown data type: " + type); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 89ee40cc19..bf14d69686 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -209,9 +209,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { // In case the recovered store used a different OpenWire version log a warning // to assist in determining why journal reads fail. if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { - LOG.warn("Recovered Store uses a different OpenWire version[{}] " + - "than the version configured[{}].", + LOG.warn("Existing Store uses a different OpenWire version[{}] " + + "than the version configured[{}] reverting to the version " + + "used by this store, some newer broker features may not work" + + "as expected.", metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); + + // Update the broker service instance to the actual version in use. + wireFormat.setVersion(metadata.openwireVersion); + brokerService.setStoreOpenWireVersion(metadata.openwireVersion); } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 2fa4bb15dd..7aa36c3183 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -132,7 +132,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); protected transient Map> ackMessageFileMap = new HashMap>(); protected int version = VERSION; - protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION; + protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; public void read(DataInput is) throws IOException { state = is.readInt(); @@ -168,7 +168,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { openwireVersion = is.readInt(); } catch (EOFException expectedOnUpgrade) { - openwireVersion = OpenWireFormat.DEFAULT_VERSION; + openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION; } LOG.info("KahaDB is version " + version); } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/AMQ5626Test.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/AMQ5626Test.java index 9097af7590..43cbf132cb 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/AMQ5626Test.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/AMQ5626Test.java @@ -17,16 +17,21 @@ package org.apache.activemq.store.kahadb; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; + import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.management.ObjectName; + import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; @@ -42,14 +47,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; - - public class AMQ5626Test { private static final Logger LOG = LoggerFactory.getLogger(AMQ5626Test.class); - private static final String QUEUE_NAME = "TesQ"; + + private final String QUEUE_NAME = "TesQ"; + private final String KAHADB_DIRECTORY = "target/activemq-data/"; + private BrokerService brokerService; private URI brokerUri; @@ -58,6 +62,15 @@ public class AMQ5626Test { createBroker(true); } + @After + public void teardown() throws Exception { + try { + brokerService.stop(); + } catch (Exception ex) { + LOG.error("FAILED TO STOP/START BROKER EXCEPTION", ex); + } + } + private void createBroker(boolean deleteMessagesOnStart) throws Exception { brokerService = new BrokerService(); @@ -79,7 +92,7 @@ public class AMQ5626Test { transportConnector.setName("openwire"); transportConnector.setUri(new URI("tcp://0.0.0.0:0")); brokerService.addConnector(transportConnector); - + brokerService.setDataDirectory(KAHADB_DIRECTORY); brokerService.setDeleteAllMessagesOnStartup(deleteMessagesOnStart); brokerService.getManagementContext().setCreateConnector(false); brokerService.start(); @@ -88,7 +101,7 @@ public class AMQ5626Test { brokerUri = transportConnector.getPublishableConnectURI(); } - @Test + @Test(timeout = 30000) public void testPriorityMessages() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri); @@ -96,9 +109,7 @@ public class AMQ5626Test { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); - Message message = session.createMessage(); // 0,1 @@ -153,33 +164,12 @@ public class AMQ5626Test { LOG.info("QueueView enqueue count : " + queueView.getEnqueueCount()); LOG.info("QueueView dequeue count : " + queueView.getDequeueCount()); LOG.info("QueueView inflight count : " + queueView.getInFlightCount()); - } } } - private QueueView getQueueView(BrokerService broker, String queueName) throws Exception { - Map queueViews = broker.getAdminView().getBroker().getQueueViews(); - - for (ObjectName key : queueViews.keySet()) { - DestinationView destinationView = queueViews.get(key); - - if (destinationView instanceof QueueView) { - QueueView queueView = (QueueView) destinationView; - - if (queueView.getName().equals(queueName)) { - return queueView; - } - - } - } - return null; - } - private synchronized void stopRestartBroker() { - try { - LOG.info(">>>SHUTTING BROKER DOWN"); brokerService.stop(); brokerService.waitUntilStopped(); @@ -190,22 +180,9 @@ public class AMQ5626Test { brokerService.waitUntilStarted(); LOG.info(">>>BROKER RESTARTED.."); - } catch (Exception e) { LOG.error("FAILED TO STOP/START BROKER EXCEPTION", e); fail("FAILED TO STOP/START BROKER" + e); } } - - @After - public void teardown() throws Exception { - - try { - brokerService.stop(); - } catch (Exception ex) { - LOG.error("FAILED TO STOP/START BROKER EXCEPTION", ex); - } - - } - } \ No newline at end of file diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index bb56e7d1ab..a39496dc10 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -16,16 +16,21 @@ */ package org.apache.activemq.store.kahadb; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; @@ -40,26 +45,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - - public class JournalCorruptionEofIndexRecoveryTest { private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionEofIndexRecoveryTest.class); - ActiveMQConnectionFactory cf = null; - BrokerService broker = null; - private final Destination destination = new ActiveMQQueue("Test"); + private ActiveMQConnectionFactory cf = null; + private BrokerService broker = null; private String connectionUri; private KahaDBPersistenceAdapter adapter; + private final Destination destination = new ActiveMQQueue("Test"); + private final String KAHADB_DIRECTORY = "target/activemq-data/"; + private final String payload = new String(new byte[1024]); protected void startBroker() throws Exception { doStartBroker(true, false); } - protected void restartBroker(boolean whackIndex) throws Exception { restartBroker(whackIndex, false); } @@ -83,6 +85,8 @@ public class JournalCorruptionEofIndexRecoveryTest { private void doStartBroker(boolean delete, boolean forceRecoverIndex) throws Exception { broker = new BrokerService(); + broker.setDataDirectory(KAHADB_DIRECTORY); + if (delete) { IOHelper.deleteChildren(broker.getPersistenceAdapter().getDirectory()); IOHelper.delete(broker.getPersistenceAdapter().getDirectory()); @@ -118,7 +122,6 @@ public class JournalCorruptionEofIndexRecoveryTest { adapter.setPreallocationStrategy("zeros"); adapter.setPreallocationScope("entire_journal"); - } @After @@ -129,7 +132,6 @@ public class JournalCorruptionEofIndexRecoveryTest { } } - @Test public void testRecoveryAfterCorruptionEof() throws Exception { startBroker(); @@ -145,9 +147,7 @@ public class JournalCorruptionEofIndexRecoveryTest { restartBroker(false); assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount()); - assertEquals("Drain", 49, drainQueue(49)); - } @Test @@ -161,9 +161,7 @@ public class JournalCorruptionEofIndexRecoveryTest { restartBroker(true); assertEquals("missing one message", 3, broker.getAdminView().getTotalMessageCount()); - assertEquals("Drain", 3, drainQueue(4)); - } @Test @@ -177,16 +175,13 @@ public class JournalCorruptionEofIndexRecoveryTest { restartBroker(false); assertEquals("unnoticed", 4, broker.getAdminView().getTotalMessageCount()); - assertEquals("Drain", 0, drainQueue(4)); // force recover index and loose one message restartBroker(false, true); assertEquals("missing one index recreation", 3, broker.getAdminView().getTotalMessageCount()); - assertEquals("Drain", 3, drainQueue(4)); - } @Test @@ -200,7 +195,6 @@ public class JournalCorruptionEofIndexRecoveryTest { restartBroker(false, true); assertEquals("Drain", numToSend, drainQueue(numToSend)); - } private void corruptBatchCheckSumSplash(int id) throws Exception{ @@ -230,7 +224,6 @@ public class JournalCorruptionEofIndexRecoveryTest { randomAccessFile.writeInt(size); randomAccessFile.getChannel().force(true); - } private void corruptBatchEndEof(int id) throws Exception{ @@ -246,7 +239,6 @@ public class JournalCorruptionEofIndexRecoveryTest { randomAccessFile.writeInt(31 * 1024 * 1024); randomAccessFile.writeLong(0l); randomAccessFile.getChannel().force(true); - } private ArrayList findBatch(RecoverableRandomAccessFile randomAccessFile, int where) throws IOException { @@ -269,11 +261,8 @@ public class JournalCorruptionEofIndexRecoveryTest { return batchPositions; } - private int getNumberOfJournalFiles() throws IOException { - - Collection files = - ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); + Collection files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); int reality = 0; for (DataFile file : files) { if (file != null) { @@ -283,11 +272,9 @@ public class JournalCorruptionEofIndexRecoveryTest { return reality; } - private int produceMessages(Destination destination, int numToSend) throws Exception { int sent = 0; - Connection connection = new ActiveMQConnectionFactory( - broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); + Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); connection.start(); try { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -307,8 +294,6 @@ public class JournalCorruptionEofIndexRecoveryTest { return produceMessages(destination, numToSend); } - final String payload = new String(new byte[1024]); - private Message createMessage(Session session, int i) throws Exception { return session.createTextMessage(payload + "::" + i); } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java index 821dfd99b6..84c2ab5c0e 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java @@ -16,16 +16,21 @@ */ package org.apache.activemq.store.kahadb; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collection; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; @@ -40,18 +45,16 @@ import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - - @RunWith(Parameterized.class) public class JournalCorruptionIndexRecoveryTest { private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionIndexRecoveryTest.class); - ActiveMQConnectionFactory cf = null; - BrokerService broker = null; + private final String KAHADB_DIRECTORY = "target/activemq-data/"; + private final String payload = new String(new byte[1024]); + + private ActiveMQConnectionFactory cf = null; + private BrokerService broker = null; private final Destination destination = new ActiveMQQueue("Test"); private String connectionUri; private KahaDBPersistenceAdapter adapter; @@ -69,7 +72,6 @@ public class JournalCorruptionIndexRecoveryTest { doStartBroker(true); } - protected void restartBroker() throws Exception { File dataDir = broker.getPersistenceAdapter().getDirectory(); @@ -83,12 +85,12 @@ public class JournalCorruptionIndexRecoveryTest { doStartBroker(false); } - private void doStartBroker(boolean delete) throws Exception { broker = new BrokerService(); broker.setDeleteAllMessagesOnStartup(delete); broker.setPersistent(true); broker.setUseJmx(true); + broker.setDataDirectory(KAHADB_DIRECTORY); broker.addConnector("tcp://localhost:0"); configurePersistence(broker); @@ -112,7 +114,6 @@ public class JournalCorruptionIndexRecoveryTest { adapter.setCheckForCorruptJournalFiles(true); adapter.setIgnoreMissingJournalfiles(true); - } @After @@ -138,11 +139,9 @@ public class JournalCorruptionIndexRecoveryTest { restartBroker(); assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount()); - assertEquals("Drain", 49, drainQueue(49)); } - @Test public void testRecoveryAfterCorruptionEnd() throws Exception { startBroker(); @@ -158,9 +157,7 @@ public class JournalCorruptionIndexRecoveryTest { restartBroker(); assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount()); - assertEquals("Drain", 49, drainQueue(49)); - } @Test @@ -180,15 +177,12 @@ public class JournalCorruptionIndexRecoveryTest { assertEquals("missing one message", 48, broker.getAdminView().getTotalMessageCount()); assertEquals("Drain", 48, drainQueue(48)); - } private void whackIndex(File dataDir) { - File indexToDelete = new File(dataDir, "db.data"); LOG.info("Whacking index: " + indexToDelete); indexToDelete.delete(); - } private void corruptBatchMiddle(int i) throws IOException { @@ -201,8 +195,7 @@ public class JournalCorruptionIndexRecoveryTest { private void corruptBatch(int id, boolean atEnd) throws IOException { - Collection files = - ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); + Collection files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); DataFile dataFile = (DataFile) files.toArray()[id]; RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile(); @@ -232,11 +225,8 @@ public class JournalCorruptionIndexRecoveryTest { randomAccessFile.write(bla, 0, bla.length); } - private int getNumberOfJournalFiles() throws IOException { - - Collection files = - ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); + Collection files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); int reality = 0; for (DataFile file : files) { if (file != null) { @@ -246,11 +236,9 @@ public class JournalCorruptionIndexRecoveryTest { return reality; } - private int produceMessages(Destination destination, int numToSend) throws Exception { int sent = 0; - Connection connection = new ActiveMQConnectionFactory( - broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); + Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); connection.start(); try { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -270,8 +258,6 @@ public class JournalCorruptionIndexRecoveryTest { return produceMessages(destination, numToSend); } - final String payload = new String(new byte[1024]); - private Message createMessage(Session session, int i) throws Exception { return session.createTextMessage(payload + "::" + i); } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java new file mode 100644 index 0000000000..5b272dbc53 --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KahaDBStoreOpenWireVersionTest { + + private static final Logger LOG = LoggerFactory.getLogger(KahaDBStoreOpenWireVersionTest.class); + + private final String KAHADB_DIRECTORY_BASE = "./target/activemq-data/"; + private final int NUM_MESSAGES = 10; + + private BrokerService broker = null; + private String storeDir; + + @Rule public TestName name = new TestName(); + + protected BrokerService createBroker(int storeOpenWireVersion) throws Exception { + broker = new BrokerService(); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + broker.setDataDirectory(storeDir); + broker.setStoreOpenWireVersion(storeOpenWireVersion); + broker.start(); + broker.waitUntilStarted(); + + return broker; + } + + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Before + public void setUp() throws Exception { + LOG.info("=============== Starting test {} ================", name.getMethodName()); + storeDir = KAHADB_DIRECTORY_BASE + name.getMethodName(); + } + + @After + public void tearDown() throws Exception { + File brokerStoreDir = new File(KAHADB_DIRECTORY_BASE); + + if (broker != null) { + brokerStoreDir = broker.getPersistenceAdapter().getDirectory(); + stopBroker(); + } + + IOHelper.deleteChildren(brokerStoreDir); + IOHelper.delete(brokerStoreDir); + + LOG.info("=============== Finished test {} ================", name.getMethodName()); + } + + @Test(timeout = 60000) + public void testConfiguredVersionWorksOnReload() throws Exception { + final int INITIAL_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION - 1; + final int RELOAD_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION - 1; + + doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION); + } + + @Test(timeout = 60000) + public void testOlderVersionWorksWithDefaults() throws Exception { + final int INITIAL_STORE_VERSION = OpenWireFormat.DEFAULT_LEGACY_VERSION; + final int RELOAD_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION; + + doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION); + } + + @Test(timeout = 60000) + public void testNewerVersionWorksWhenOlderIsConfigured() throws Exception { + final int INITIAL_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION; + final int RELOAD_STORE_VERSION = OpenWireFormat.DEFAULT_LEGACY_VERSION; + + doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION); + } + + private void doTestStoreVersionConfigrationOverrides(int create, int reload) throws Exception { + createBroker(create); + populateStore(); + stopBroker(); + + createBroker(reload); + assertEquals(create, broker.getStoreOpenWireVersion()); + assertStoreIsUsable(); + } + + private void populateStore() throws Exception { + + ConnectionFactory factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + Connection connection = factory.createConnection(); + connection.setClientID("test"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("test.topic"); + Queue queue = session.createQueue("test.queue"); + MessageConsumer consumer = session.createDurableSubscriber(topic, "test"); + consumer.close(); + + MessageProducer producer = session.createProducer(topic); + producer.setPriority(9); + for (int i = 0; i < NUM_MESSAGES; i++) { + Message msg = session.createTextMessage("test message:" + i); + producer.send(msg); + } + LOG.info("sent {} to topic", NUM_MESSAGES); + + producer = session.createProducer(queue); + for (int i = 0; i < NUM_MESSAGES; i++) { + Message msg = session.createTextMessage("test message:" + i); + producer.send(msg); + } + LOG.info("sent {} to topic", NUM_MESSAGES); + + connection.close(); + } + + private void assertStoreIsUsable() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + connection.setClientID("test"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("test.topic"); + Queue queue = session.createQueue("test.queue"); + + MessageConsumer queueConsumer = session.createConsumer(queue); + for (int i = 0; i < NUM_MESSAGES; ++i) { + TextMessage received = (TextMessage) queueConsumer.receive(1000); + assertNotNull(received); + } + LOG.info("Consumed {} from queue", NUM_MESSAGES); + + MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test"); + for (int i = 0; i < NUM_MESSAGES; ++i) { + TextMessage received = (TextMessage) topicConsumer.receive(1000); + assertNotNull(received); + } + LOG.info("Consumed {} from topic", NUM_MESSAGES); + + connection.close(); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java index 919c5daf21..a25b094aab 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener { + private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitorTest.class); public Runnable serverRunOnCommand; @@ -54,6 +55,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra private final AtomicBoolean ignoreClientError = new AtomicBoolean(false); private final AtomicBoolean ignoreServerError = new AtomicBoolean(false); + @Override protected void setUp() throws Exception { super.setUp(); startTransportServer(); @@ -66,6 +68,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra private void startClient() throws Exception, URISyntaxException { clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000")); clientTransport.setTransportListener(new TransportListener() { + @Override public void onCommand(Object command) { clientReceiveCount.incrementAndGet(); if (clientRunOnCommand != null) { @@ -73,6 +76,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra } } + @Override public void onException(IOException error) { if (!ignoreClientError.get()) { LOG.info("Client transport error:"); @@ -81,12 +85,15 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra } } + @Override public void transportInterupted() { } + @Override public void transportResumed() { } }); + clientTransport.start(); } @@ -103,6 +110,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra serverPort = server.getSocketAddress().getPort(); } + @Override protected void tearDown() throws Exception { ignoreClientError.set(true); ignoreServerError.set(true); @@ -122,11 +130,13 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra super.tearDown(); } + @Override public void onAccept(Transport transport) { try { LOG.info("[" + getName() + "] Server Accepted a Connection"); serverTransport = transport; serverTransport.setTransportListener(new TransportListener() { + @Override public void onCommand(Object command) { serverReceiveCount.incrementAndGet(); if (serverRunOnCommand != null) { @@ -134,6 +144,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra } } + @Override public void onException(IOException error) { if (!ignoreClientError.get()) { LOG.info("Server transport error:", error); @@ -141,9 +152,11 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra } } + @Override public void transportInterupted() { } + @Override public void transportResumed() { } }); @@ -153,18 +166,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra } } + @Override public void onAcceptError(Exception error) { LOG.trace(error.toString()); } public void testClientHang() throws Exception { - - // // Manually create a client transport so that it does not send KeepAlive - // packets. - // this should simulate a client hang. + // packets. this should simulate a client hang. clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null); clientTransport.setTransportListener(new TransportListener() { + @Override public void onCommand(Object command) { clientReceiveCount.incrementAndGet(); if (clientRunOnCommand != null) { @@ -172,6 +184,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra } } + @Override public void onException(IOException error) { if (!ignoreClientError.get()) { LOG.info("Client transport error:"); @@ -180,15 +193,18 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra } } + @Override public void transportInterupted() { } + @Override public void transportResumed() { } }); + clientTransport.start(); WireFormatInfo info = new WireFormatInfo(); - info.setVersion(OpenWireFormat.DEFAULT_VERSION); + info.setVersion(OpenWireFormat.DEFAULT_LEGACY_VERSION); info.setMaxInactivityDuration(1000); clientTransport.oneway(info); @@ -223,12 +239,12 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra * @throws URISyntaxException */ public void initCombosForTestNoClientHangWithServerBlock() throws Exception { - startClient(); addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)}); addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)}); addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() { + @Override public void run() { try { LOG.info("Sleeping"); @@ -251,5 +267,4 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra assertEquals(0, clientErrorCount.get()); assertEquals(0, serverErrorCount.get()); } - }