Use the latest openwire version marshallers in the KahaDB store when
starting from a clean install, drop back to the version used in the
existing store if one is found.
This commit is contained in:
Timothy Bish 2015-07-08 17:29:32 -04:00
parent b0952d8747
commit 13044decce
12 changed files with 298 additions and 141 deletions

View File

@ -222,9 +222,4 @@ public class AMQ4563Test extends AmqpTestSupport {
protected boolean isPersistent() { protected boolean isPersistent() {
return true; return true;
} }
@Override
protected int getStoreOpenWireVersion() {
return 10;
}
} }

View File

@ -45,7 +45,6 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.spring.SpringSslContext; import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.amqp.protocol.AmqpConnection; import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
@ -102,7 +101,6 @@ public class AmqpTestSupport {
KahaDBStore kaha = new KahaDBStore(); KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName())); kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
brokerService.setPersistenceAdapter(kaha); brokerService.setPersistenceAdapter(kaha);
brokerService.setStoreOpenWireVersion(getStoreOpenWireVersion());
} }
brokerService.setSchedulerSupport(false); brokerService.setSchedulerSupport(false);
brokerService.setAdvisorySupport(false); brokerService.setAdvisorySupport(false);
@ -188,10 +186,6 @@ public class AmqpTestSupport {
return true; return true;
} }
protected int getStoreOpenWireVersion() {
return OpenWireFormat.DEFAULT_WIRE_VERSION;
}
protected boolean isUseOpenWireConnector() { protected boolean isUseOpenWireConnector() {
return false; return false;
} }

View File

@ -251,7 +251,7 @@ public class BrokerService implements Service {
private boolean restartRequested = false; private boolean restartRequested = false;
private boolean rejectDurableConsumers = false; private boolean rejectDurableConsumers = false;
private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION; private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
static { static {

View File

@ -27,8 +27,10 @@ public interface CommandTypes {
byte PROTOCOL_VERSION = 11; byte PROTOCOL_VERSION = 11;
// What is the latest version of the openwire protocol used in the stores // What is the latest version of the openwire protocol used in the stores
byte PROTOCOL_STORE_VERSION = 6; byte PROTOCOL_STORE_VERSION = 11;
// What is the legacy version that old KahaDB store's most commonly used
byte PROTOCOL_LEGACY_STORE_VERSION = 6;
// A marshaling layer can use this type to specify a null object. // A marshaling layer can use this type to specify a null object.
byte NULL = 0; byte NULL = 0;

View File

@ -33,13 +33,14 @@ import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
/** /**
* *
* *
*/ */
public final class OpenWireFormat implements WireFormat { public final class OpenWireFormat implements WireFormat {
public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; public static final int DEFAULT_STORE_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION; public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
public static final int DEFAULT_LEGACY_VERSION = CommandTypes.PROTOCOL_LEGACY_STORE_VERSION;
public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
static final byte NULL_TYPE = CommandTypes.NULL; static final byte NULL_TYPE = CommandTypes.NULL;
@ -64,15 +65,16 @@ public final class OpenWireFormat implements WireFormat {
private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
private WireFormatInfo preferedWireFormatInfo; private WireFormatInfo preferedWireFormatInfo;
public OpenWireFormat() { public OpenWireFormat() {
this(DEFAULT_VERSION); this(DEFAULT_STORE_VERSION);
} }
public OpenWireFormat(int i) { public OpenWireFormat(int i) {
setVersion(i); setVersion(i);
} }
@Override
public int hashCode() { public int hashCode() {
return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
^ (stackTraceEnabled ? 0x01000000 : 0x02000000) ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
@ -91,6 +93,7 @@ public final class OpenWireFormat implements WireFormat {
return answer; return answer;
} }
@Override
public boolean equals(Object object) { public boolean equals(Object object) {
if (object == null) { if (object == null) {
return false; return false;
@ -102,6 +105,7 @@ public final class OpenWireFormat implements WireFormat {
} }
@Override
public String toString() { public String toString() {
return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
+ tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}"; + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}";
@ -109,10 +113,12 @@ public final class OpenWireFormat implements WireFormat {
// tightEncodingEnabled="+tightEncodingEnabled+"}"; // tightEncodingEnabled="+tightEncodingEnabled+"}";
} }
@Override
public int getVersion() { public int getVersion() {
return version; return version;
} }
@Override
public synchronized ByteSequence marshal(Object command) throws IOException { public synchronized ByteSequence marshal(Object command) throws IOException {
if (cacheEnabled) { if (cacheEnabled) {
@ -125,7 +131,7 @@ public final class OpenWireFormat implements WireFormat {
DataStructure c = (DataStructure)command; DataStructure c = (DataStructure)command;
byte type = c.getDataStructureType(); byte type = c.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + type); throw new IOException("Unknown data type: " + type);
} }
@ -173,6 +179,7 @@ public final class OpenWireFormat implements WireFormat {
return sequence; return sequence;
} }
@Override
public synchronized Object unmarshal(ByteSequence sequence) throws IOException { public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
bytesIn.restart(sequence); bytesIn.restart(sequence);
// DataInputStream dis = new DataInputStream(new // DataInputStream dis = new DataInputStream(new
@ -197,6 +204,7 @@ public final class OpenWireFormat implements WireFormat {
return command; return command;
} }
@Override
public synchronized void marshal(Object o, DataOutput dataOut) throws IOException { public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
if (cacheEnabled) { if (cacheEnabled) {
@ -208,7 +216,7 @@ public final class OpenWireFormat implements WireFormat {
DataStructure c = (DataStructure)o; DataStructure c = (DataStructure)o;
byte type = c.getDataStructureType(); byte type = c.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + type); throw new IOException("Unknown data type: " + type);
} }
@ -246,12 +254,13 @@ public final class OpenWireFormat implements WireFormat {
} else { } else {
if (!sizePrefixDisabled) { if (!sizePrefixDisabled) {
dataOut.writeInt(size); dataOut.writeInt(size);
} }
dataOut.writeByte(NULL_TYPE); dataOut.writeByte(NULL_TYPE);
} }
} }
@Override
public Object unmarshal(DataInput dis) throws IOException { public Object unmarshal(DataInput dis) throws IOException {
DataInput dataIn = dis; DataInput dataIn = dis;
if (!sizePrefixDisabled) { if (!sizePrefixDisabled) {
@ -276,7 +285,7 @@ public final class OpenWireFormat implements WireFormat {
if (o != null) { if (o != null) {
DataStructure c = (DataStructure)o; DataStructure c = (DataStructure)o;
byte type = c.getDataStructureType(); byte type = c.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + type); throw new IOException("Unknown data type: " + type);
} }
@ -299,7 +308,7 @@ public final class OpenWireFormat implements WireFormat {
if (o != null) { if (o != null) {
DataStructure c = (DataStructure)o; DataStructure c = (DataStructure)o;
byte type = c.getDataStructureType(); byte type = c.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + type); throw new IOException("Unknown data type: " + type);
} }
@ -312,9 +321,10 @@ public final class OpenWireFormat implements WireFormat {
/** /**
* Allows you to dynamically switch the version of the openwire protocol * Allows you to dynamically switch the version of the openwire protocol
* being used. * being used.
* *
* @param version * @param version
*/ */
@Override
public void setVersion(int version) { public void setVersion(int version) {
String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory"; String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
Class mfClass; Class mfClass;
@ -343,7 +353,7 @@ public final class OpenWireFormat implements WireFormat {
public Object doUnmarshal(DataInput dis) throws IOException { public Object doUnmarshal(DataInput dis) throws IOException {
byte dataType = dis.readByte(); byte dataType = dis.readByte();
if (dataType != NULL_TYPE) { if (dataType != NULL_TYPE) {
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + dataType); throw new IOException("Unknown data type: " + dataType);
} }
@ -382,7 +392,7 @@ public final class OpenWireFormat implements WireFormat {
} }
byte type = o.getDataStructureType(); byte type = o.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + type); throw new IOException("Unknown data type: " + type);
} }
@ -409,7 +419,7 @@ public final class OpenWireFormat implements WireFormat {
} else { } else {
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + type); throw new IOException("Unknown data type: " + type);
} }
@ -422,7 +432,7 @@ public final class OpenWireFormat implements WireFormat {
if (bs.readBoolean()) { if (bs.readBoolean()) {
byte dataType = dis.readByte(); byte dataType = dis.readByte();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + dataType); throw new IOException("Unknown data type: " + dataType);
} }
@ -455,7 +465,7 @@ public final class OpenWireFormat implements WireFormat {
if (dis.readBoolean()) { if (dis.readBoolean()) {
byte dataType = dis.readByte(); byte dataType = dis.readByte();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + dataType); throw new IOException("Unknown data type: " + dataType);
} }
@ -473,7 +483,7 @@ public final class OpenWireFormat implements WireFormat {
if (o != null) { if (o != null) {
byte type = o.getDataStructureType(); byte type = o.getDataStructureType();
dataOut.writeByte(type); dataOut.writeByte(type);
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF]; DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
if (dsm == null) { if (dsm == null) {
throw new IOException("Unknown data type: " + type); throw new IOException("Unknown data type: " + type);
} }

View File

@ -209,9 +209,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
// In case the recovered store used a different OpenWire version log a warning // In case the recovered store used a different OpenWire version log a warning
// to assist in determining why journal reads fail. // to assist in determining why journal reads fail.
if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
LOG.warn("Recovered Store uses a different OpenWire version[{}] " + LOG.warn("Existing Store uses a different OpenWire version[{}] " +
"than the version configured[{}].", "than the version configured[{}] reverting to the version " +
"used by this store, some newer broker features may not work" +
"as expected.",
metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
// Update the broker service instance to the actual version in use.
wireFormat.setVersion(metadata.openwireVersion);
brokerService.setStoreOpenWireVersion(metadata.openwireVersion);
} }
} }

View File

@ -132,7 +132,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
protected int version = VERSION; protected int version = VERSION;
protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION; protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
public void read(DataInput is) throws IOException { public void read(DataInput is) throws IOException {
state = is.readInt(); state = is.readInt();
@ -168,7 +168,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try { try {
openwireVersion = is.readInt(); openwireVersion = is.readInt();
} catch (EOFException expectedOnUpgrade) { } catch (EOFException expectedOnUpgrade) {
openwireVersion = OpenWireFormat.DEFAULT_VERSION; openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
} }
LOG.info("KahaDB is version " + version); LOG.info("KahaDB is version " + version);
} }

View File

@ -17,16 +17,21 @@
package org.apache.activemq.store.kahadb; package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -42,14 +47,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
public class AMQ5626Test { public class AMQ5626Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ5626Test.class); private static final Logger LOG = LoggerFactory.getLogger(AMQ5626Test.class);
private static final String QUEUE_NAME = "TesQ";
private final String QUEUE_NAME = "TesQ";
private final String KAHADB_DIRECTORY = "target/activemq-data/";
private BrokerService brokerService; private BrokerService brokerService;
private URI brokerUri; private URI brokerUri;
@ -58,6 +62,15 @@ public class AMQ5626Test {
createBroker(true); createBroker(true);
} }
@After
public void teardown() throws Exception {
try {
brokerService.stop();
} catch (Exception ex) {
LOG.error("FAILED TO STOP/START BROKER EXCEPTION", ex);
}
}
private void createBroker(boolean deleteMessagesOnStart) throws Exception { private void createBroker(boolean deleteMessagesOnStart) throws Exception {
brokerService = new BrokerService(); brokerService = new BrokerService();
@ -79,7 +92,7 @@ public class AMQ5626Test {
transportConnector.setName("openwire"); transportConnector.setName("openwire");
transportConnector.setUri(new URI("tcp://0.0.0.0:0")); transportConnector.setUri(new URI("tcp://0.0.0.0:0"));
brokerService.addConnector(transportConnector); brokerService.addConnector(transportConnector);
brokerService.setDataDirectory(KAHADB_DIRECTORY);
brokerService.setDeleteAllMessagesOnStartup(deleteMessagesOnStart); brokerService.setDeleteAllMessagesOnStartup(deleteMessagesOnStart);
brokerService.getManagementContext().setCreateConnector(false); brokerService.getManagementContext().setCreateConnector(false);
brokerService.start(); brokerService.start();
@ -88,7 +101,7 @@ public class AMQ5626Test {
brokerUri = transportConnector.getPublishableConnectURI(); brokerUri = transportConnector.getPublishableConnectURI();
} }
@Test @Test(timeout = 30000)
public void testPriorityMessages() throws Exception { public void testPriorityMessages() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri);
@ -96,9 +109,7 @@ public class AMQ5626Test {
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
Message message = session.createMessage(); Message message = session.createMessage();
// 0,1 // 0,1
@ -153,33 +164,12 @@ public class AMQ5626Test {
LOG.info("QueueView enqueue count : " + queueView.getEnqueueCount()); LOG.info("QueueView enqueue count : " + queueView.getEnqueueCount());
LOG.info("QueueView dequeue count : " + queueView.getDequeueCount()); LOG.info("QueueView dequeue count : " + queueView.getDequeueCount());
LOG.info("QueueView inflight count : " + queueView.getInFlightCount()); LOG.info("QueueView inflight count : " + queueView.getInFlightCount());
} }
} }
} }
private QueueView getQueueView(BrokerService broker, String queueName) throws Exception {
Map<ObjectName, DestinationView> queueViews = broker.getAdminView().getBroker().getQueueViews();
for (ObjectName key : queueViews.keySet()) {
DestinationView destinationView = queueViews.get(key);
if (destinationView instanceof QueueView) {
QueueView queueView = (QueueView) destinationView;
if (queueView.getName().equals(queueName)) {
return queueView;
}
}
}
return null;
}
private synchronized void stopRestartBroker() { private synchronized void stopRestartBroker() {
try { try {
LOG.info(">>>SHUTTING BROKER DOWN"); LOG.info(">>>SHUTTING BROKER DOWN");
brokerService.stop(); brokerService.stop();
brokerService.waitUntilStopped(); brokerService.waitUntilStopped();
@ -190,22 +180,9 @@ public class AMQ5626Test {
brokerService.waitUntilStarted(); brokerService.waitUntilStarted();
LOG.info(">>>BROKER RESTARTED.."); LOG.info(">>>BROKER RESTARTED..");
} catch (Exception e) { } catch (Exception e) {
LOG.error("FAILED TO STOP/START BROKER EXCEPTION", e); LOG.error("FAILED TO STOP/START BROKER EXCEPTION", e);
fail("FAILED TO STOP/START BROKER" + e); fail("FAILED TO STOP/START BROKER" + e);
} }
} }
@After
public void teardown() throws Exception {
try {
brokerService.stop();
} catch (Exception ex) {
LOG.error("FAILED TO STOP/START BROKER EXCEPTION", ex);
}
}
} }

View File

@ -16,16 +16,21 @@
*/ */
package org.apache.activemq.store.kahadb; package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
@ -40,26 +45,23 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class JournalCorruptionEofIndexRecoveryTest { public class JournalCorruptionEofIndexRecoveryTest {
private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionEofIndexRecoveryTest.class); private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionEofIndexRecoveryTest.class);
ActiveMQConnectionFactory cf = null; private ActiveMQConnectionFactory cf = null;
BrokerService broker = null; private BrokerService broker = null;
private final Destination destination = new ActiveMQQueue("Test");
private String connectionUri; private String connectionUri;
private KahaDBPersistenceAdapter adapter; private KahaDBPersistenceAdapter adapter;
private final Destination destination = new ActiveMQQueue("Test");
private final String KAHADB_DIRECTORY = "target/activemq-data/";
private final String payload = new String(new byte[1024]);
protected void startBroker() throws Exception { protected void startBroker() throws Exception {
doStartBroker(true, false); doStartBroker(true, false);
} }
protected void restartBroker(boolean whackIndex) throws Exception { protected void restartBroker(boolean whackIndex) throws Exception {
restartBroker(whackIndex, false); restartBroker(whackIndex, false);
} }
@ -83,6 +85,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
private void doStartBroker(boolean delete, boolean forceRecoverIndex) throws Exception { private void doStartBroker(boolean delete, boolean forceRecoverIndex) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.setDataDirectory(KAHADB_DIRECTORY);
if (delete) { if (delete) {
IOHelper.deleteChildren(broker.getPersistenceAdapter().getDirectory()); IOHelper.deleteChildren(broker.getPersistenceAdapter().getDirectory());
IOHelper.delete(broker.getPersistenceAdapter().getDirectory()); IOHelper.delete(broker.getPersistenceAdapter().getDirectory());
@ -118,7 +122,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
adapter.setPreallocationStrategy("zeros"); adapter.setPreallocationStrategy("zeros");
adapter.setPreallocationScope("entire_journal"); adapter.setPreallocationScope("entire_journal");
} }
@After @After
@ -129,7 +132,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
} }
} }
@Test @Test
public void testRecoveryAfterCorruptionEof() throws Exception { public void testRecoveryAfterCorruptionEof() throws Exception {
startBroker(); startBroker();
@ -145,9 +147,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
restartBroker(false); restartBroker(false);
assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount()); assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 49, drainQueue(49)); assertEquals("Drain", 49, drainQueue(49));
} }
@Test @Test
@ -161,9 +161,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
restartBroker(true); restartBroker(true);
assertEquals("missing one message", 3, broker.getAdminView().getTotalMessageCount()); assertEquals("missing one message", 3, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 3, drainQueue(4)); assertEquals("Drain", 3, drainQueue(4));
} }
@Test @Test
@ -177,16 +175,13 @@ public class JournalCorruptionEofIndexRecoveryTest {
restartBroker(false); restartBroker(false);
assertEquals("unnoticed", 4, broker.getAdminView().getTotalMessageCount()); assertEquals("unnoticed", 4, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 0, drainQueue(4)); assertEquals("Drain", 0, drainQueue(4));
// force recover index and loose one message // force recover index and loose one message
restartBroker(false, true); restartBroker(false, true);
assertEquals("missing one index recreation", 3, broker.getAdminView().getTotalMessageCount()); assertEquals("missing one index recreation", 3, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 3, drainQueue(4)); assertEquals("Drain", 3, drainQueue(4));
} }
@Test @Test
@ -200,7 +195,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
restartBroker(false, true); restartBroker(false, true);
assertEquals("Drain", numToSend, drainQueue(numToSend)); assertEquals("Drain", numToSend, drainQueue(numToSend));
} }
private void corruptBatchCheckSumSplash(int id) throws Exception{ private void corruptBatchCheckSumSplash(int id) throws Exception{
@ -230,7 +224,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
randomAccessFile.writeInt(size); randomAccessFile.writeInt(size);
randomAccessFile.getChannel().force(true); randomAccessFile.getChannel().force(true);
} }
private void corruptBatchEndEof(int id) throws Exception{ private void corruptBatchEndEof(int id) throws Exception{
@ -246,7 +239,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
randomAccessFile.writeInt(31 * 1024 * 1024); randomAccessFile.writeInt(31 * 1024 * 1024);
randomAccessFile.writeLong(0l); randomAccessFile.writeLong(0l);
randomAccessFile.getChannel().force(true); randomAccessFile.getChannel().force(true);
} }
private ArrayList<Integer> findBatch(RecoverableRandomAccessFile randomAccessFile, int where) throws IOException { private ArrayList<Integer> findBatch(RecoverableRandomAccessFile randomAccessFile, int where) throws IOException {
@ -269,11 +261,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
return batchPositions; return batchPositions;
} }
private int getNumberOfJournalFiles() throws IOException { private int getNumberOfJournalFiles() throws IOException {
Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
Collection<DataFile> files =
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
int reality = 0; int reality = 0;
for (DataFile file : files) { for (DataFile file : files) {
if (file != null) { if (file != null) {
@ -283,11 +272,9 @@ public class JournalCorruptionEofIndexRecoveryTest {
return reality; return reality;
} }
private int produceMessages(Destination destination, int numToSend) throws Exception { private int produceMessages(Destination destination, int numToSend) throws Exception {
int sent = 0; int sent = 0;
Connection connection = new ActiveMQConnectionFactory( Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
connection.start(); connection.start();
try { try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -307,8 +294,6 @@ public class JournalCorruptionEofIndexRecoveryTest {
return produceMessages(destination, numToSend); return produceMessages(destination, numToSend);
} }
final String payload = new String(new byte[1024]);
private Message createMessage(Session session, int i) throws Exception { private Message createMessage(Session session, int i) throws Exception {
return session.createTextMessage(payload + "::" + i); return session.createTextMessage(payload + "::" + i);
} }

View File

@ -16,16 +16,21 @@
*/ */
package org.apache.activemq.store.kahadb; package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
@ -40,18 +45,16 @@ import org.junit.runners.Parameterized;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class JournalCorruptionIndexRecoveryTest { public class JournalCorruptionIndexRecoveryTest {
private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionIndexRecoveryTest.class); private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionIndexRecoveryTest.class);
ActiveMQConnectionFactory cf = null; private final String KAHADB_DIRECTORY = "target/activemq-data/";
BrokerService broker = null; private final String payload = new String(new byte[1024]);
private ActiveMQConnectionFactory cf = null;
private BrokerService broker = null;
private final Destination destination = new ActiveMQQueue("Test"); private final Destination destination = new ActiveMQQueue("Test");
private String connectionUri; private String connectionUri;
private KahaDBPersistenceAdapter adapter; private KahaDBPersistenceAdapter adapter;
@ -69,7 +72,6 @@ public class JournalCorruptionIndexRecoveryTest {
doStartBroker(true); doStartBroker(true);
} }
protected void restartBroker() throws Exception { protected void restartBroker() throws Exception {
File dataDir = broker.getPersistenceAdapter().getDirectory(); File dataDir = broker.getPersistenceAdapter().getDirectory();
@ -83,12 +85,12 @@ public class JournalCorruptionIndexRecoveryTest {
doStartBroker(false); doStartBroker(false);
} }
private void doStartBroker(boolean delete) throws Exception { private void doStartBroker(boolean delete) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(delete); broker.setDeleteAllMessagesOnStartup(delete);
broker.setPersistent(true); broker.setPersistent(true);
broker.setUseJmx(true); broker.setUseJmx(true);
broker.setDataDirectory(KAHADB_DIRECTORY);
broker.addConnector("tcp://localhost:0"); broker.addConnector("tcp://localhost:0");
configurePersistence(broker); configurePersistence(broker);
@ -112,7 +114,6 @@ public class JournalCorruptionIndexRecoveryTest {
adapter.setCheckForCorruptJournalFiles(true); adapter.setCheckForCorruptJournalFiles(true);
adapter.setIgnoreMissingJournalfiles(true); adapter.setIgnoreMissingJournalfiles(true);
} }
@After @After
@ -138,11 +139,9 @@ public class JournalCorruptionIndexRecoveryTest {
restartBroker(); restartBroker();
assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount()); assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 49, drainQueue(49)); assertEquals("Drain", 49, drainQueue(49));
} }
@Test @Test
public void testRecoveryAfterCorruptionEnd() throws Exception { public void testRecoveryAfterCorruptionEnd() throws Exception {
startBroker(); startBroker();
@ -158,9 +157,7 @@ public class JournalCorruptionIndexRecoveryTest {
restartBroker(); restartBroker();
assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount()); assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 49, drainQueue(49)); assertEquals("Drain", 49, drainQueue(49));
} }
@Test @Test
@ -180,15 +177,12 @@ public class JournalCorruptionIndexRecoveryTest {
assertEquals("missing one message", 48, broker.getAdminView().getTotalMessageCount()); assertEquals("missing one message", 48, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 48, drainQueue(48)); assertEquals("Drain", 48, drainQueue(48));
} }
private void whackIndex(File dataDir) { private void whackIndex(File dataDir) {
File indexToDelete = new File(dataDir, "db.data"); File indexToDelete = new File(dataDir, "db.data");
LOG.info("Whacking index: " + indexToDelete); LOG.info("Whacking index: " + indexToDelete);
indexToDelete.delete(); indexToDelete.delete();
} }
private void corruptBatchMiddle(int i) throws IOException { private void corruptBatchMiddle(int i) throws IOException {
@ -201,8 +195,7 @@ public class JournalCorruptionIndexRecoveryTest {
private void corruptBatch(int id, boolean atEnd) throws IOException { private void corruptBatch(int id, boolean atEnd) throws IOException {
Collection<DataFile> files = Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
DataFile dataFile = (DataFile) files.toArray()[id]; DataFile dataFile = (DataFile) files.toArray()[id];
RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile(); RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
@ -232,11 +225,8 @@ public class JournalCorruptionIndexRecoveryTest {
randomAccessFile.write(bla, 0, bla.length); randomAccessFile.write(bla, 0, bla.length);
} }
private int getNumberOfJournalFiles() throws IOException { private int getNumberOfJournalFiles() throws IOException {
Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
Collection<DataFile> files =
((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
int reality = 0; int reality = 0;
for (DataFile file : files) { for (DataFile file : files) {
if (file != null) { if (file != null) {
@ -246,11 +236,9 @@ public class JournalCorruptionIndexRecoveryTest {
return reality; return reality;
} }
private int produceMessages(Destination destination, int numToSend) throws Exception { private int produceMessages(Destination destination, int numToSend) throws Exception {
int sent = 0; int sent = 0;
Connection connection = new ActiveMQConnectionFactory( Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
connection.start(); connection.start();
try { try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -270,8 +258,6 @@ public class JournalCorruptionIndexRecoveryTest {
return produceMessages(destination, numToSend); return produceMessages(destination, numToSend);
} }
final String payload = new String(new byte[1024]);
private Message createMessage(Session session, int i) throws Exception { private Message createMessage(Session session, int i) throws Exception {
return session.createTextMessage(payload + "::" + i); return session.createTextMessage(payload + "::" + i);
} }

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KahaDBStoreOpenWireVersionTest {
private static final Logger LOG = LoggerFactory.getLogger(KahaDBStoreOpenWireVersionTest.class);
private final String KAHADB_DIRECTORY_BASE = "./target/activemq-data/";
private final int NUM_MESSAGES = 10;
private BrokerService broker = null;
private String storeDir;
@Rule public TestName name = new TestName();
protected BrokerService createBroker(int storeOpenWireVersion) throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.setAdvisorySupport(false);
broker.setDataDirectory(storeDir);
broker.setStoreOpenWireVersion(storeOpenWireVersion);
broker.start();
broker.waitUntilStarted();
return broker;
}
public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
@Before
public void setUp() throws Exception {
LOG.info("=============== Starting test {} ================", name.getMethodName());
storeDir = KAHADB_DIRECTORY_BASE + name.getMethodName();
}
@After
public void tearDown() throws Exception {
File brokerStoreDir = new File(KAHADB_DIRECTORY_BASE);
if (broker != null) {
brokerStoreDir = broker.getPersistenceAdapter().getDirectory();
stopBroker();
}
IOHelper.deleteChildren(brokerStoreDir);
IOHelper.delete(brokerStoreDir);
LOG.info("=============== Finished test {} ================", name.getMethodName());
}
@Test(timeout = 60000)
public void testConfiguredVersionWorksOnReload() throws Exception {
final int INITIAL_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION - 1;
final int RELOAD_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION - 1;
doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION);
}
@Test(timeout = 60000)
public void testOlderVersionWorksWithDefaults() throws Exception {
final int INITIAL_STORE_VERSION = OpenWireFormat.DEFAULT_LEGACY_VERSION;
final int RELOAD_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION;
doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION);
}
@Test(timeout = 60000)
public void testNewerVersionWorksWhenOlderIsConfigured() throws Exception {
final int INITIAL_STORE_VERSION = OpenWireFormat.DEFAULT_STORE_VERSION;
final int RELOAD_STORE_VERSION = OpenWireFormat.DEFAULT_LEGACY_VERSION;
doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION);
}
private void doTestStoreVersionConfigrationOverrides(int create, int reload) throws Exception {
createBroker(create);
populateStore();
stopBroker();
createBroker(reload);
assertEquals(create, broker.getStoreOpenWireVersion());
assertStoreIsUsable();
}
private void populateStore() throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
Connection connection = factory.createConnection();
connection.setClientID("test");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
Queue queue = session.createQueue("test.queue");
MessageConsumer consumer = session.createDurableSubscriber(topic, "test");
consumer.close();
MessageProducer producer = session.createProducer(topic);
producer.setPriority(9);
for (int i = 0; i < NUM_MESSAGES; i++) {
Message msg = session.createTextMessage("test message:" + i);
producer.send(msg);
}
LOG.info("sent {} to topic", NUM_MESSAGES);
producer = session.createProducer(queue);
for (int i = 0; i < NUM_MESSAGES; i++) {
Message msg = session.createTextMessage("test message:" + i);
producer.send(msg);
}
LOG.info("sent {} to topic", NUM_MESSAGES);
connection.close();
}
private void assertStoreIsUsable() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection();
connection.setClientID("test");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
Queue queue = session.createQueue("test.queue");
MessageConsumer queueConsumer = session.createConsumer(queue);
for (int i = 0; i < NUM_MESSAGES; ++i) {
TextMessage received = (TextMessage) queueConsumer.receive(1000);
assertNotNull(received);
}
LOG.info("Consumed {} from queue", NUM_MESSAGES);
MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test");
for (int i = 0; i < NUM_MESSAGES; ++i) {
TextMessage received = (TextMessage) topicConsumer.receive(1000);
assertNotNull(received);
}
LOG.info("Consumed {} from topic", NUM_MESSAGES);
connection.close();
}
}

View File

@ -36,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener { public class InactivityMonitorTest extends CombinationTestSupport implements TransportAcceptListener {
private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitorTest.class); private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitorTest.class);
public Runnable serverRunOnCommand; public Runnable serverRunOnCommand;
@ -54,6 +55,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
private final AtomicBoolean ignoreClientError = new AtomicBoolean(false); private final AtomicBoolean ignoreClientError = new AtomicBoolean(false);
private final AtomicBoolean ignoreServerError = new AtomicBoolean(false); private final AtomicBoolean ignoreServerError = new AtomicBoolean(false);
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
startTransportServer(); startTransportServer();
@ -66,6 +68,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
private void startClient() throws Exception, URISyntaxException { private void startClient() throws Exception, URISyntaxException {
clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000")); clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
clientTransport.setTransportListener(new TransportListener() { clientTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) { public void onCommand(Object command) {
clientReceiveCount.incrementAndGet(); clientReceiveCount.incrementAndGet();
if (clientRunOnCommand != null) { if (clientRunOnCommand != null) {
@ -73,6 +76,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
} }
} }
@Override
public void onException(IOException error) { public void onException(IOException error) {
if (!ignoreClientError.get()) { if (!ignoreClientError.get()) {
LOG.info("Client transport error:"); LOG.info("Client transport error:");
@ -81,12 +85,15 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
} }
} }
@Override
public void transportInterupted() { public void transportInterupted() {
} }
@Override
public void transportResumed() { public void transportResumed() {
} }
}); });
clientTransport.start(); clientTransport.start();
} }
@ -103,6 +110,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
serverPort = server.getSocketAddress().getPort(); serverPort = server.getSocketAddress().getPort();
} }
@Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
ignoreClientError.set(true); ignoreClientError.set(true);
ignoreServerError.set(true); ignoreServerError.set(true);
@ -122,11 +130,13 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
super.tearDown(); super.tearDown();
} }
@Override
public void onAccept(Transport transport) { public void onAccept(Transport transport) {
try { try {
LOG.info("[" + getName() + "] Server Accepted a Connection"); LOG.info("[" + getName() + "] Server Accepted a Connection");
serverTransport = transport; serverTransport = transport;
serverTransport.setTransportListener(new TransportListener() { serverTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) { public void onCommand(Object command) {
serverReceiveCount.incrementAndGet(); serverReceiveCount.incrementAndGet();
if (serverRunOnCommand != null) { if (serverRunOnCommand != null) {
@ -134,6 +144,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
} }
} }
@Override
public void onException(IOException error) { public void onException(IOException error) {
if (!ignoreClientError.get()) { if (!ignoreClientError.get()) {
LOG.info("Server transport error:", error); LOG.info("Server transport error:", error);
@ -141,9 +152,11 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
} }
} }
@Override
public void transportInterupted() { public void transportInterupted() {
} }
@Override
public void transportResumed() { public void transportResumed() {
} }
}); });
@ -153,18 +166,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
} }
} }
@Override
public void onAcceptError(Exception error) { public void onAcceptError(Exception error) {
LOG.trace(error.toString()); LOG.trace(error.toString());
} }
public void testClientHang() throws Exception { public void testClientHang() throws Exception {
//
// Manually create a client transport so that it does not send KeepAlive // Manually create a client transport so that it does not send KeepAlive
// packets. // packets. this should simulate a client hang.
// this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null); clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
clientTransport.setTransportListener(new TransportListener() { clientTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) { public void onCommand(Object command) {
clientReceiveCount.incrementAndGet(); clientReceiveCount.incrementAndGet();
if (clientRunOnCommand != null) { if (clientRunOnCommand != null) {
@ -172,6 +184,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
} }
} }
@Override
public void onException(IOException error) { public void onException(IOException error) {
if (!ignoreClientError.get()) { if (!ignoreClientError.get()) {
LOG.info("Client transport error:"); LOG.info("Client transport error:");
@ -180,15 +193,18 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
} }
} }
@Override
public void transportInterupted() { public void transportInterupted() {
} }
@Override
public void transportResumed() { public void transportResumed() {
} }
}); });
clientTransport.start(); clientTransport.start();
WireFormatInfo info = new WireFormatInfo(); WireFormatInfo info = new WireFormatInfo();
info.setVersion(OpenWireFormat.DEFAULT_VERSION); info.setVersion(OpenWireFormat.DEFAULT_LEGACY_VERSION);
info.setMaxInactivityDuration(1000); info.setMaxInactivityDuration(1000);
clientTransport.oneway(info); clientTransport.oneway(info);
@ -223,12 +239,12 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
* @throws URISyntaxException * @throws URISyntaxException
*/ */
public void initCombosForTestNoClientHangWithServerBlock() throws Exception { public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
startClient(); startClient();
addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)}); addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)});
addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)}); addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)});
addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() { addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() {
@Override
public void run() { public void run() {
try { try {
LOG.info("Sleeping"); LOG.info("Sleeping");
@ -251,5 +267,4 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
assertEquals(0, clientErrorCount.get()); assertEquals(0, clientErrorCount.get());
assertEquals(0, serverErrorCount.get()); assertEquals(0, serverErrorCount.get());
} }
} }