From a0af3515201afcf635100fc1fee6b336e378e8be Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Wed, 23 Sep 2009 19:06:58 +0000 Subject: [PATCH] AMQ-2408 and AMQ-2407 - adding new tool to manually inspect/audit the amqPersistenceAdapter's journal files. - adding new HTTP based discovery agent git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@818209 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-optional/pom.xml | 23 +- .../activemq/store/amq/AMQJournalTool.java | 353 ++++++++++++++++++ .../store/amq/AMQJournalToolCommand.java | 44 +++ .../store/amq/CommandLineSupport.java | 123 ++++++ .../store/amq/CustomResourceLoader.java | 97 +++++ .../org/apache/activemq/store/amq/Entry.java | 70 ++++ .../store/amq/MessageBodyFormatter.java | 62 +++ .../store/amq/reader/AMQIterator.java | 85 +++++ .../activemq/store/amq/reader/AMQReader.java | 172 +++++++++ .../store/amq/reader/MessageLocation.java | 60 +++ .../http/DiscoveryRegistryServlet.java | 106 ++++++ .../discovery/http/EmbeddedJettyServer.java | 77 ++++ .../discovery/http/HTTPDiscoveryAgent.java | 349 +++++++++++++++++ .../http/HTTPDiscoveryAgentFactory.java | 48 +++ .../activemq/transport/discoveryagent/http | 4 + .../org/apache/activemq/store/amq/help.txt | 52 +++ .../store/amq/reader/AMQReaderTest.java | 56 +++ .../activemq/store/amq/reader/data/data-1 | Bin 0 -> 64331 bytes .../activemq/store/amq/reader/data/data-2 | Bin 0 -> 64835 bytes .../activemq/store/amq/reader/data/data-3 | Bin 0 -> 64638 bytes .../activemq/store/amq/reader/data/data-4 | Bin 0 -> 65397 bytes .../activemq/store/amq/reader/data/data-5 | Bin 0 -> 65390 bytes .../activemq/store/amq/reader/data/data-6 | Bin 0 -> 65397 bytes .../activemq/store/amq/reader/data/data-7 | Bin 0 -> 64833 bytes .../activemq/store/amq/reader/data/data-8 | Bin 0 -> 65397 bytes .../activemq/store/amq/reader/data/data-9 | Bin 0 -> 65014 bytes 26 files changed, 1779 insertions(+), 2 deletions(-) create mode 100644 activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java create mode 100644 activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java create mode 100755 activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java create mode 100644 activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http create mode 100644 activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt create mode 100644 activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java create mode 100644 activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1 create mode 100644 activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-2 create mode 100644 activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3 create mode 100644 activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-4 create mode 100644 activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-5 create mode 100644 activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6 create mode 100644 activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7 create mode 100644 activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8 create mode 100644 activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9 diff --git a/activemq-optional/pom.xml b/activemq-optional/pom.xml index a19456afa0..56d838317b 100755 --- a/activemq-optional/pom.xml +++ b/activemq-optional/pom.xml @@ -44,7 +44,11 @@ ${pom.groupId} activeio-core - + + ${pom.groupId} + activemq-console + + org.springframework spring @@ -153,7 +157,22 @@ org.codehaus.jettison jettison test - + + + + velocity + velocity + + + net.sf.josql + josql + + + net.sf.josql + gentlyweb-utils + + + diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java new file mode 100644 index 0000000000..736d012007 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq; + +import java.io.File; +import java.io.InputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Scanner; + +import org.apache.activemq.command.ActiveMQBlobMessage; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.JournalQueueAck; +import org.apache.activemq.command.JournalTopicAck; +import org.apache.activemq.command.JournalTrace; +import org.apache.activemq.command.JournalTransaction; +import org.apache.activemq.kaha.impl.async.Location; +import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.velocity.Template; +import org.apache.velocity.VelocityContext; +import org.apache.velocity.app.Velocity; +import org.apache.velocity.app.VelocityEngine; +import org.josql.Query; + +/** + * Allows you to view the contents of a Journal. + * + * @author Hiram Chirino + */ +public class AMQJournalTool { + + private final ArrayList dirs = new ArrayList(); + private final WireFormat wireFormat = new OpenWireFormat(); + private final HashMap resources = new HashMap(); + + private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}"; + private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}"; + private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}"; + private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}"; + private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}"; + private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}"; + private String where; + private VelocityContext context; + private VelocityEngine velocity; + private boolean help; + + public static void main(String[] args) throws Exception { + AMQJournalTool consumerTool = new AMQJournalTool(); + String[] directories = CommandLineSupport + .setOptions(consumerTool, args); + if (directories.length < 1) { + System.out + .println("Please specify the directories with journal data to scan"); + return; + } + for (int i = 0; i < directories.length; i++) { + consumerTool.getDirs().add(new File(directories[i])); + } + consumerTool.execute(); + } + + public void execute() throws Exception { + + if( help ) { + showHelp(); + return; + } + + if (getDirs().size() < 1) { + System.out.println(""); + System.out.println("Invalid Usage: Please specify the directories with journal data to scan"); + System.out.println(""); + showHelp(); + return; + } + + for (File dir : getDirs()) { + if( !dir.exists() ) { + System.out.println(""); + System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist"); + System.out.println(""); + showHelp(); + return; + } + if( !dir.isDirectory() ) { + System.out.println(""); + System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory"); + System.out.println(""); + showHelp(); + return; + } + } + + + context = new VelocityContext(); + List keys = Arrays.asList(context.getKeys()); + + for (Iterator iterator = System.getProperties().entrySet() + .iterator(); iterator.hasNext();) { + Map.Entry kv = (Map.Entry) iterator.next(); + String name = (String) kv.getKey(); + String value = (String) kv.getValue(); + + if (!keys.contains(name)) { + context.put(name, value); + } + } + + velocity = new VelocityEngine(); + velocity.setProperty(Velocity.RESOURCE_LOADER, "all"); + velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName()); + velocity.init(); + + + resources.put("message", messageFormat); + resources.put("topicAck", topicAckFormat); + resources.put("queueAck", queueAckFormat); + resources.put("transaction", transactionFormat); + resources.put("trace", traceFormat); + resources.put("unknown", unknownFormat); + + Query query = null; + if (where != null) { + query = new Query(); + query.parse("select * from "+Entry.class.getName()+" where "+where); + + } + + ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs()); + manager.start(); + try { + Location curr = manager.getFirstLocation(); + while (curr != null) { + + ByteSequence data = manager.read(curr); + DataStructure c = (DataStructure) wireFormat.unmarshal(data); + + Entry entry = new Entry(); + entry.setLocation(curr); + entry.setRecord(c); + entry.setData(data); + entry.setQuery(query); + process(entry); + + curr = manager.getNextLocation(curr); + } + } finally { + manager.close(); + } + } + + private void showHelp() { + InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt"); + Scanner scanner = new Scanner(is); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + System.out.println(line); + } + scanner.close(); } + + private void process(Entry entry) throws Exception { + + Location location = entry.getLocation(); + DataStructure record = entry.getRecord(); + + switch (record.getDataStructureType()) { + case ActiveMQMessage.DATA_STRUCTURE_TYPE: + entry.setType("ActiveMQMessage"); + entry.setFormater("message"); + display(entry); + break; + case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE: + entry.setType("ActiveMQBytesMessage"); + entry.setFormater("message"); + display(entry); + break; + case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE: + entry.setType("ActiveMQBlobMessage"); + entry.setFormater("message"); + display(entry); + break; + case ActiveMQMapMessage.DATA_STRUCTURE_TYPE: + entry.setType("ActiveMQMapMessage"); + entry.setFormater("message"); + display(entry); + break; + case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE: + entry.setType("ActiveMQObjectMessage"); + entry.setFormater("message"); + display(entry); + break; + case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE: + entry.setType("ActiveMQStreamMessage"); + entry.setFormater("message"); + display(entry); + break; + case ActiveMQTextMessage.DATA_STRUCTURE_TYPE: + entry.setType("ActiveMQTextMessage"); + entry.setFormater("message"); + display(entry); + break; + case JournalQueueAck.DATA_STRUCTURE_TYPE: + entry.setType("Queue Ack"); + entry.setFormater("queueAck"); + display(entry); + break; + case JournalTopicAck.DATA_STRUCTURE_TYPE: + entry.setType("Topic Ack"); + entry.setFormater("topicAck"); + display(entry); + break; + case JournalTransaction.DATA_STRUCTURE_TYPE: + entry.setType(getType((JournalTransaction) record)); + entry.setFormater("transaction"); + display(entry); + break; + case JournalTrace.DATA_STRUCTURE_TYPE: + entry.setType("Trace"); + entry.setFormater("trace"); + display(entry); + break; + default: + entry.setType("Unknown"); + entry.setFormater("unknown"); + display(entry); + break; + } + } + + private String getType(JournalTransaction record) { + switch (record.getType()) { + case JournalTransaction.XA_PREPARE: + return "XA Prepare"; + case JournalTransaction.XA_COMMIT: + return "XA Commit"; + case JournalTransaction.XA_ROLLBACK: + return "XA Rollback"; + case JournalTransaction.LOCAL_COMMIT: + return "Commit"; + case JournalTransaction.LOCAL_ROLLBACK: + return "Rollback"; + } + return "Unknown Transaction"; + } + + private void display(Entry entry) throws Exception { + + if (entry.getQuery() != null) { + List list = Collections.singletonList(entry); + List results = entry.getQuery().execute(list).getResults(); + if (results.isEmpty()) { + return; + } + } + + CustomResourceLoader.setResources(resources); + try { + + context.put("location", entry.getLocation()); + context.put("record", entry.getRecord()); + context.put("type", entry.getType()); + if (entry.getRecord() instanceof ActiveMQMessage) { + context.put("body", new MessageBodyFormatter( + (ActiveMQMessage) entry.getRecord())); + } + + Template template = velocity.getTemplate(entry.getFormater()); + PrintWriter writer = new PrintWriter(System.out); + template.merge(context, writer); + writer.println(); + writer.flush(); + } finally { + CustomResourceLoader.setResources(null); + } + } + + public void setMessageFormat(String messageFormat) { + this.messageFormat = messageFormat; + } + + public void setTopicAckFormat(String ackFormat) { + this.topicAckFormat = ackFormat; + } + + public void setTransactionFormat(String transactionFormat) { + this.transactionFormat = transactionFormat; + } + + public void setTraceFormat(String traceFormat) { + this.traceFormat = traceFormat; + } + + public void setUnknownFormat(String unknownFormat) { + this.unknownFormat = unknownFormat; + } + + public void setQueueAckFormat(String queueAckFormat) { + this.queueAckFormat = queueAckFormat; + } + + public String getQuery() { + return where; + } + + public void setWhere(String query) { + this.where = query; + } + + public boolean isHelp() { + return help; + } + + public void setHelp(boolean help) { + this.help = help; + } + + /** + * @return the dirs + */ + public ArrayList getDirs() { + return dirs; + } + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java new file mode 100644 index 0000000000..e3fb5a2609 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq; + +import java.io.File; +import java.util.List; + +import org.apache.activemq.console.CommandContext; +import org.apache.activemq.console.command.Command; + +public class AMQJournalToolCommand implements Command { + + private CommandContext context; + + public void execute(List tokens) throws Exception { + AMQJournalTool consumerTool = new AMQJournalTool(); + String args[] = new String[tokens.size()]; + tokens.toArray(args); + String[] directories = CommandLineSupport.setOptions(consumerTool, args); + for (int i = 0; i < directories.length; i++) { + consumerTool.getDirs().add(new File(directories[i])); + } + consumerTool.execute(); + } + + public void setCommandContext(CommandContext context) { + this.context = context; + } + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java new file mode 100644 index 0000000000..bffba306fb --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq; + +import java.util.ArrayList; + +import org.apache.activemq.util.IntrospectionSupport; + +/** + * Helper utility that can be used to set the properties on any object using + * command line arguments. + * + * @author Hiram Chirino + */ +public final class CommandLineSupport { + + private CommandLineSupport() { + } + + /** + * Sets the properties of an object given the command line args. + * + * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent + * + * then it will try to call the following setters on the target object. + * + * target.setAckMode("AUTO"); + * target.setURL(new URI("tcp://localhost:61616") ); + * target.setPersistent(true); + * + * Notice the the proper conversion for the argument is determined by examining the + * setter arguement type. + * + * @param target the object that will have it's properties set + * @param args the commline options + * @return any arguments that are not valid options for the target + */ + public static String[] setOptions(Object target, String[] args) { + ArrayList rc = new ArrayList(); + + for (int i = 0; i < args.length; i++) { + if (args[i] == null) { + continue; + } + + if (args[i].startsWith("--")) { + + // --options without a specified value are considered boolean + // flags that are enabled. + String value = "true"; + String name = args[i].substring(2); + + // if --option=value case + int p = name.indexOf("="); + if (p > 0) { + value = name.substring(p + 1); + name = name.substring(0, p); + } + + // name not set, then it's an unrecognized option + if (name.length() == 0) { + rc.add(args[i]); + continue; + } + + String propName = convertOptionToPropertyName(name); + if (!IntrospectionSupport.setProperty(target, propName, value)) { + rc.add(args[i]); + continue; + } + } else { + rc.add(args[i]); + } + + } + + String r[] = new String[rc.size()]; + rc.toArray(r); + return r; + } + + /** + * converts strings like: test-enabled to testEnabled + * + * @param name + * @return + */ + private static String convertOptionToPropertyName(String name) { + String rc = ""; + + // Look for '-' and strip and then convert the subsequent char to + // uppercase + int p = name.indexOf("-"); + while (p > 0) { + // strip + rc += name.substring(0, p); + name = name.substring(p + 1); + + // can I convert the next char to upper? + if (name.length() > 0) { + rc += name.substring(0, 1).toUpperCase(); + name = name.substring(1); + } + + p = name.indexOf("-"); + } + return rc + name; + } +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java new file mode 100644 index 0000000000..7a44018c73 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.HashMap; + +import org.apache.commons.collections.ExtendedProperties; +import org.apache.velocity.exception.ResourceNotFoundException; +import org.apache.velocity.runtime.RuntimeServices; +import org.apache.velocity.runtime.resource.Resource; +import org.apache.velocity.runtime.resource.loader.FileResourceLoader; +import org.apache.velocity.runtime.resource.loader.ResourceLoader; + +public class CustomResourceLoader extends ResourceLoader { + + private final static ThreadLocal> resourcesTL = new ThreadLocal>(); + private final FileResourceLoader fileResourceLoader = new FileResourceLoader(); + + @Override + public void commonInit(RuntimeServices rs, ExtendedProperties configuration) { + super.commonInit(rs, configuration); + fileResourceLoader.commonInit(rs, configuration); + } + + public void init( ExtendedProperties configuration) + { + fileResourceLoader.init(configuration); + } + + /** + */ + public synchronized InputStream getResourceStream( String name ) + throws ResourceNotFoundException + { + InputStream result = null; + + if (name == null || name.length() == 0) + { + throw new ResourceNotFoundException ("No template name provided"); + } + + String value = null; + HashMap resources = resourcesTL.get(); + if( resources!=null ) { + value = resources.get(name); + } + + if( value == null ) { + result = this.fileResourceLoader.getResourceStream(name); + } else { + try + { + result = new ByteArrayInputStream(value.getBytes()); + } + catch( Exception e ) + { + throw new ResourceNotFoundException( e.getMessage() ); + } + } + return result; + } + + public boolean isSourceModified(Resource resource) + { + return false; + } + + public long getLastModified(Resource resource) + { + return 0; + } + + static public HashMap getResources() { + return resourcesTL.get(); + } + + static public void setResources(HashMap arg0) { + resourcesTL.set(arg0); + } + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java new file mode 100644 index 0000000000..17cd104b10 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq; + +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.kaha.impl.async.Location; +import org.apache.activemq.util.ByteSequence; +import org.josql.Query; + +public class Entry { + + Location location; + DataStructure record; + private ByteSequence data; + private String type; + private String formater; + private Query query; + + public Location getLocation() { + return location; + } + public void setLocation(Location location) { + this.location = location; + } + public DataStructure getRecord() { + return record; + } + public void setRecord(DataStructure record) { + this.record = record; + } + public void setData(ByteSequence data) { + this.data = data; + } + public void setType(String type) { + this.type = type; + } + public ByteSequence getData() { + return data; + } + public String getType() { + return type; + } + public void setFormater(String formater) { + this.formater = formater; + } + public String getFormater() { + return formater; + } + public void setQuery(Query query) { + this.query = query; + } + public Query getQuery() { + return query; + } + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java new file mode 100644 index 0000000000..e2d23dc32c --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq; + +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQBlobMessage; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.util.ByteSequence; + +public class MessageBodyFormatter { + final ActiveMQMessage message; + + public MessageBodyFormatter(ActiveMQMessage message) { + this.message=message; + } + + @Override + public String toString() { + try { + switch (message.getDataStructureType()) { + case ActiveMQMessage.DATA_STRUCTURE_TYPE: + return ""; + case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE: + ActiveMQBlobMessage blob = (ActiveMQBlobMessage) message; + return blob.getRemoteBlobUrl(); + case ActiveMQMapMessage.DATA_STRUCTURE_TYPE: + ActiveMQMapMessage map = (ActiveMQMapMessage)message; + return map.getContentMap().toString(); + case ActiveMQTextMessage.DATA_STRUCTURE_TYPE: + ActiveMQTextMessage text = (ActiveMQTextMessage)message; + return text.getText(); + case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE: + case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE: + case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE: + ByteSequence data = message.getContent(); + return "binary payload {length="+data.getLength()+", compressed="+message.isCompressed()+"}"; + } + } catch (JMSException e) { + } + return ""; + } +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java new file mode 100644 index 0000000000..1e3e0a502f --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq.reader; + +import java.util.Iterator; + +import javax.jms.Message; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.filter.MessageEvaluationContext; + +/** + * An Iterator for the AMQReader + * + */ +class AMQIterator implements Iterator{ + private AMQReader reader; + private BooleanExpression expression; + private MessageLocation currentLocation; + private MessageLocation nextLocation; + private boolean valid=true; + + + AMQIterator(AMQReader reader, BooleanExpression expression){ + this.reader=reader; + this.expression=expression; + } + + public boolean hasNext() { + try { + this.nextLocation = reader.getNextMessage(currentLocation); + Message next = nextLocation != null ? nextLocation.getMessage() + : null; + if (expression == null) { + return next != null; + } else { + while (next != null) { + MessageEvaluationContext context = new MessageEvaluationContext(); + context.setMessageReference((MessageReference) next); + if (expression.matches(context)) { + return true; + } + this.nextLocation = reader.getNextMessage(currentLocation); + next = nextLocation != null ? nextLocation.getMessage() + : null; + } + valid=false; + return false; + } + } catch (Exception e) { + throw new RuntimeException( + "Failed to get next message from reader ", e); + } + } + + + public Message next() { + if (valid && (nextLocation != null || hasNext())) { + this.currentLocation=nextLocation; + return nextLocation.getMessage(); + } + return null; + } + + + public void remove() { + throw new IllegalStateException("Not supported"); + + } + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java new file mode 100644 index 0000000000..0b90924351 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq.reader; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import javax.jms.InvalidSelectorException; +import javax.jms.Message; + +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.kaha.impl.async.AsyncDataManager; +import org.apache.activemq.kaha.impl.async.Location; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.wireformat.WireFormat; + +/** + * Reads and iterates through data log files for the AMQMessage Store + * + */ +public class AMQReader implements Iterable { + + private AsyncDataManager dataManager; + private WireFormat wireFormat = new OpenWireFormat(); + private File file; + private BooleanExpression expression; + + /** + * List all the data files in a directory + * @param directory + * @return + * @throws IOException + */ + public static Set listDataFiles(File directory) throws IOException{ + Setresult = new HashSet(); + if (directory == null || !directory.exists() || !directory.isDirectory()) { + throw new IOException("Invalid Directory " + directory); + } + AsyncDataManager dataManager = new AsyncDataManager(); + dataManager.setDirectory(directory); + dataManager.start(); + Set set = dataManager.getFiles(); + if (set != null) { + result.addAll(set); + } + dataManager.close(); + return result; + } + /** + * Create the AMQReader to read a directory of amq data logs - or an + * individual data log file + * + * @param file the directory - or file + * @throws IOException + * @throws InvalidSelectorException + * @throws IOException + * @throws InvalidSelectorException + */ + public AMQReader(File file) throws InvalidSelectorException, IOException { + this(file,null); + } + + /** + * Create the AMQReader to read a directory of amq data logs - or an + * individual data log file + * + * @param file the directory - or file + * @param selector the JMS selector or null to select all + * @throws IOException + * @throws InvalidSelectorException + */ + public AMQReader(File file, String selector) throws IOException, InvalidSelectorException { + String str = selector != null ? selector.trim() : null; + if (str != null && str.length() > 0) { + this.expression=SelectorParser.parse(str); + } + dataManager = new AsyncDataManager(); + dataManager.setArchiveDataLogs(false); + if (file.isDirectory()) { + dataManager.setDirectory(file); + } else { + dataManager.setDirectory(file.getParentFile()); + dataManager.setDirectoryArchive(file); + this.file = file; + } + dataManager.start(); + } + + public Iterator iterator() { + return new AMQIterator(this,this.expression); + } + + + protected MessageLocation getNextMessage(MessageLocation lastLocation) + throws IllegalStateException, IOException { + if (this.file != null) { + return getInternalNextMessage(this.file, lastLocation); + } + return getInternalNextMessage(lastLocation); + } + + private MessageLocation getInternalNextMessage(MessageLocation lastLocation) + throws IllegalStateException, IOException { + return getInternalNextMessage(null, lastLocation); + } + + private MessageLocation getInternalNextMessage(File file, + MessageLocation lastLocation) throws IllegalStateException, + IOException { + MessageLocation result = lastLocation; + if (result != null) { + result.setMessage(null); + } + Message message = null; + Location pos = lastLocation != null ? lastLocation.getLocation() : null; + while ((pos = getNextLocation(file, pos)) != null) { + message = getMessage(pos); + if (message != null) { + if (result == null) { + result = new MessageLocation(); + } + result.setMessage(message); + break; + } + } + result.setLocation(pos); + if (pos == null && message == null) { + result = null; + } else { + result.setLocation(pos); + } + return result; + } + + private Location getNextLocation(File file, Location last) + throws IllegalStateException, IOException { + if (file != null) { + return dataManager.getNextLocation(file, last, true); + } + return dataManager.getNextLocation(last); + } + + private Message getMessage(Location location) throws IOException { + ByteSequence data = dataManager.read(location); + DataStructure c = (DataStructure) wireFormat.unmarshal(data); + if (c instanceof Message) { + return (Message) c; + } + return null; + + } +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java new file mode 100644 index 0000000000..e4c68c6fee --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq.reader; + +import javax.jms.Message; + +import org.apache.activemq.kaha.impl.async.Location; + +/** + * A holder for a message + * + */ +class MessageLocation { + private Message message; + private Location location; + + + /** + * @return the location + */ + public Location getLocation() { + return location; + } + + /** + * @param location + */ + public void setLocation(Location location) { + this.location = location; + } + /** + * @return the message + */ + public Message getMessage() { + return message; + } + + /** + * @param message + */ + public void setMessage(Message message) { + this.message = message; + } + + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java new file mode 100644 index 0000000000..970739d30c --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.discovery.http; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class DiscoveryRegistryServlet extends HttpServlet { + + private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class); + long maxKeepAge = 1000*60*60; // 1 hour. + ConcurrentHashMap> serviceGroups = new ConcurrentHashMap>(); + + @Override + protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + String group = req.getPathInfo(); + String service = req.getHeader("service"); + LOG.debug("Registering: group="+group+", service="+service); + + ConcurrentHashMap services = getServiceGroup(group); + services.put(service, System.currentTimeMillis()); + } + + private ConcurrentHashMap getServiceGroup(String group) { + ConcurrentHashMap rc = serviceGroups.get(group); + if( rc == null ) { + rc = new ConcurrentHashMap(); + serviceGroups.put(group, rc); + } + return rc; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + try { + long freshness = 1000*30; + String p = req.getParameter("freshness"); + if( p!=null ) { + freshness = Long.parseLong(p); + } + + String group = req.getPathInfo(); + LOG.debug("group="+group); + ConcurrentHashMap services = getServiceGroup(group); + PrintWriter writer = resp.getWriter(); + + long now = System.currentTimeMillis(); + long dropTime = now-maxKeepAge; + long minimumTime = now-freshness; + + ArrayList dropList = new ArrayList(); + for (Map.Entry entry : services.entrySet()) { + if( entry.getValue() > minimumTime ) { + writer.println(entry.getKey()); + } else if( entry.getValue() < dropTime ) { + dropList.add(entry.getKey()); + } + } + + // We might as well get rid of the really old entries. + for (String service : dropList) { + services.remove(service); + } + + + } catch (Exception e) { + resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error occured: "+e); + } + } + + @Override + protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + String group = req.getPathInfo(); + String service = req.getHeader("service"); + LOG.debug("Unregistering: group="+group+", service="+service); + + ConcurrentHashMap services = getServiceGroup(group); + services.remove(service); + } + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java new file mode 100644 index 0000000000..c26ae7360e --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.discovery.http; + +import java.net.URI; + +import org.mortbay.jetty.Server; +import org.mortbay.jetty.nio.SelectChannelConnector; +import org.mortbay.jetty.servlet.Context; +import org.mortbay.jetty.servlet.ServletHolder; + +public class EmbeddedJettyServer implements org.apache.activemq.Service { + + private HTTPDiscoveryAgent agent; + private Server server; + private SelectChannelConnector connector; + private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet(); + + public void start() throws Exception { + URI uri = new URI(agent.getRegistryURL()); + + server = new Server(); + Context context = new Context(Context.NO_SECURITY | Context.NO_SESSIONS); + + context.setContextPath("/"); + ServletHolder holder = new ServletHolder(); + holder.setServlet(camelServlet); + context.addServlet(holder, "/*"); + server.setHandler(context); + server.start(); + + int port = 80; + if( uri.getPort() >=0 ) { + port = uri.getPort(); + } + + connector = new SelectChannelConnector(); + connector.setPort(port); + server.addConnector(connector); + connector.start(); + } + + public void stop() throws Exception { + if( connector!=null ) { + connector.stop(); + connector = null; + } + if( server!=null ) { + server.stop(); + server = null; + } + } + + public HTTPDiscoveryAgent getAgent() { + return agent; + } + + public void setAgent(HTTPDiscoveryAgent agent) { + this.agent = agent; + } + + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java new file mode 100644 index 0000000000..46e2328bd1 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.discovery.http; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.Service; +import org.apache.activemq.command.DiscoveryEvent; +import org.apache.activemq.transport.discovery.DiscoveryAgent; +import org.apache.activemq.transport.discovery.DiscoveryListener; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.DeleteMethod; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.PutMethod; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class HTTPDiscoveryAgent implements DiscoveryAgent { + + private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class); + + private String registryURL = "http://localhost:8080/discovery-registry/default"; + private HttpClient httpClient = new HttpClient(); + private AtomicBoolean running=new AtomicBoolean(); + private final AtomicReference discoveryListener = new AtomicReference(); + private final HashSet registeredServices = new HashSet(); + private final HashMap discoveredServices = new HashMap(); + private Thread thread; + private long updateInterval = 1000*10; + private String brokerName; + private boolean startEmbeddRegistry=false; + private Service jetty; + private AtomicInteger startCounter=new AtomicInteger(0); + + + private long initialReconnectDelay = 1000; + private long maxReconnectDelay = 1000 * 30; + private long backOffMultiplier = 2; + private boolean useExponentialBackOff=true; + private int maxReconnectAttempts; + private final Object sleepMutex = new Object(); + private long minConnectTime = 5000; + + class SimpleDiscoveryEvent extends DiscoveryEvent { + + private int connectFailures; + private long reconnectDelay = initialReconnectDelay; + private long connectTime = System.currentTimeMillis(); + private AtomicBoolean failed = new AtomicBoolean(false); + private AtomicBoolean removed = new AtomicBoolean(false); + + public SimpleDiscoveryEvent(String service) { + super(service); + } + + } + + + public String getGroup() { + return null; + } + + public void registerService(String service) throws IOException { + synchronized(registeredServices) { + registeredServices.add(service); + } + doRegister(service); + } + + synchronized private void doRegister(String service) { + String url = registryURL; + try { + PutMethod method = new PutMethod(url); +// method.setParams(createParams()); + method.setRequestHeader("service", service); + int responseCode = httpClient.executeMethod(method); + LOG.debug("PUT to "+url+" got a "+responseCode); + } catch (Exception e) { + LOG.debug("PUT to "+url+" failed with: "+e); + } + } + + synchronized private void doUnRegister(String service) { + String url = registryURL; + try { + DeleteMethod method = new DeleteMethod(url); +// method.setParams(createParams()); + method.setRequestHeader("service", service); + int responseCode = httpClient.executeMethod(method); + LOG.debug("DELETE to "+url+" got a "+responseCode); + } catch (Exception e) { + LOG.debug("DELETE to "+url+" failed with: "+e); + } + } + +// private HttpMethodParams createParams() { +// HttpMethodParams params = new HttpMethodParams(); +// params.setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0,false)); +// return params; +// } + + synchronized private Set doLookup(long freshness) { + String url = registryURL+"?freshness="+freshness; + try { + GetMethod method = new GetMethod(url); +// method.setParams(createParams()); + int responseCode = httpClient.executeMethod(method); + LOG.debug("GET to "+url+" got a "+responseCode); + if( responseCode == 200 ) { + Set rc = new HashSet(); + Scanner scanner = new Scanner(method.getResponseBodyAsStream()); + while( scanner.hasNextLine() ) { + String service = scanner.nextLine(); + if( service.trim().length() != 0 ) { + rc.add(service); + } + } + return rc; + } else { + LOG.debug("GET to "+url+" failed with response code: "+responseCode); + return null; + } + } catch (Exception e) { + LOG.debug("GET to "+url+" failed with: "+e); + return null; + } + } + + public void serviceFailed(DiscoveryEvent devent) throws IOException { + + final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent; + if (event.failed.compareAndSet(false, true)) { + discoveryListener.get().onServiceRemove(event); + if(!event.removed.get()) { + // Setup a thread to re-raise the event... + Thread thread = new Thread() { + public void run() { + + // We detect a failed connection attempt because the service + // fails right away. + if (event.connectTime + minConnectTime > System.currentTimeMillis()) { + LOG.debug("Failure occured soon after the discovery event was generated. It will be clasified as a connection failure: "+event); + + event.connectFailures++; + + if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { + LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled."); + return; + } + + synchronized (sleepMutex) { + try { + if (!running.get() || event.removed.get()) { + return; + } + LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect."); + sleepMutex.wait(event.reconnectDelay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + + if (!useExponentialBackOff) { + event.reconnectDelay = initialReconnectDelay; + } else { + // Exponential increment of reconnect delay. + event.reconnectDelay *= backOffMultiplier; + if (event.reconnectDelay > maxReconnectDelay) { + event.reconnectDelay = maxReconnectDelay; + } + } + + } else { + event.connectFailures = 0; + event.reconnectDelay = initialReconnectDelay; + } + + if (!running.get() || event.removed.get()) { + return; + } + + event.connectTime = System.currentTimeMillis(); + event.failed.set(false); + discoveryListener.get().onServiceAdd(event); + } + }; + thread.setDaemon(true); + thread.start(); + } + } + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public void setDiscoveryListener(DiscoveryListener discoveryListener) { + this.discoveryListener.set(discoveryListener); + } + + public void setGroup(String group) { + } + + public void start() throws Exception { + if( startCounter.addAndGet(1)==1 ) { + if( startEmbeddRegistry ) { + jetty = createEmbeddedJettyServer(); + Map props = new HashMap(); + props.put("agent", this); + IntrospectionSupport.setProperties(jetty, props); + jetty.start(); + } + + running.set(true); + thread = new Thread("HTTPDiscovery Agent") { + @Override + public void run() { + while(running.get()) { + try { + update(); + Thread.sleep(updateInterval); + } catch (InterruptedException e) { + return; + } + } + } + }; + thread.setDaemon(true); + thread.start(); + } + } + + /** + * Create the EmbeddedJettyServer instance via reflection so that we can avoid a hard runtime dependency on + * jetty. + * + * @return + * @throws Exception + */ + private Service createEmbeddedJettyServer() throws Exception { + Class clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer"); + return (Service)clazz.newInstance(); + } + + private void update() { + // Register all our services... + synchronized(registeredServices) { + for (String service : registeredServices) { + doRegister(service); + } + } + + // Find new registered services... + DiscoveryListener discoveryListener = this.discoveryListener.get(); + if(discoveryListener!=null) { + Set activeServices = doLookup(updateInterval*3); + // If there is error talking the the central server, then activeServices == null + if( activeServices !=null ) { + synchronized(discoveredServices) { + + HashSet removedServices = new HashSet(discoveredServices.keySet()); + removedServices.removeAll(activeServices); + + HashSet addedServices = new HashSet(activeServices); + addedServices.removeAll(discoveredServices.keySet()); + addedServices.removeAll(removedServices); + + for (String service : addedServices) { + SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service); + discoveredServices.put(service, e); + discoveryListener.onServiceAdd(e); + } + + for (String service : removedServices) { + SimpleDiscoveryEvent e = discoveredServices.remove(service); + if( e !=null ) { + e.removed.set(true); + } + discoveryListener.onServiceRemove(e); + } + } + } + } + } + + public void stop() throws Exception { + if( startCounter.decrementAndGet()==0 ) { + running.set(false); + if( thread!=null ) { + thread.join(updateInterval*3); + thread=null; + } + if( jetty!=null ) { + jetty.stop(); + jetty = null; + } + } + } + + public String getRegistryURL() { + return registryURL; + } + + public void setRegistryURL(String discoveryRegistryURL) { + this.registryURL = discoveryRegistryURL; + } + + public long getUpdateInterval() { + return updateInterval; + } + + public void setUpdateInterval(long updateInterval) { + this.updateInterval = updateInterval; + } + + public boolean isStartEmbeddRegistry() { + return startEmbeddRegistry; + } + + public void setStartEmbeddRegistry(boolean startEmbeddRegistry) { + this.startEmbeddRegistry = startEmbeddRegistry; + } + +} diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java new file mode 100755 index 0000000000..45c50bc9e3 --- /dev/null +++ b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.discovery.http; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; + +import org.apache.activemq.transport.discovery.DiscoveryAgent; +import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; + +public class HTTPDiscoveryAgentFactory extends DiscoveryAgentFactory { + + protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException { + try { + + Map options = URISupport.parseParamters(uri); + uri = URISupport.removeQuery(uri); + + HTTPDiscoveryAgent rc = new HTTPDiscoveryAgent(); + rc.setRegistryURL(uri.toString()); + + IntrospectionSupport.setProperties(rc, options); + + return rc; + + } catch (Throwable e) { + throw IOExceptionSupport.create("Could not create discovery agent: " + uri, e); + } + } +} diff --git a/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http b/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http new file mode 100644 index 0000000000..f7c63deea3 --- /dev/null +++ b/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http @@ -0,0 +1,4 @@ +## --------------------------------------------------------------------------- +## +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.discovery.http.HTTPDiscoveryAgentFactory diff --git a/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt b/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt new file mode 100644 index 0000000000..780564290e --- /dev/null +++ b/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt @@ -0,0 +1,52 @@ +Usage: + java org.apache.activemq.store.amq.AMQJournalTool [options]* (directory) * + +Displays the records stored in the Journal log files used by ActiveMQ. This +tool supports loading the journal data files from multiple directories. Normally +it is run against the journal archive directory and the active journal directory. + +This tool supports controlling the output format using Velocity [1] templates. +It also supports filtering out records using a SQL like WHERE syntax implemented +using JoSQL. + +Options to control output format: + +Any valid Velocity Template Language (VTL) expression can be used to control the +display of the record. + + --message-format=VTL The format used to display message records. Message + records get created every time a producer sends a persistent message to the broker. + The message gets recorded in the journal even if it's transaction is rolled back. + Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body} + + --topic-ack-format=VTL The format used to display topic ack records. A topic + ack records that a durable subscription for a topic has acknowleged a set of messages. + Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId} + + --queue-ack-format=VTL The format used to display queue ack records. A queue + ack records that a consumer for a quue has acknowleged a message. + Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId} + + --transaction-format=VTL The format used to display transaction records. Transaction records + are used to record transaction related actions like commit and rollback. + Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.transactionId} + + --trace-format=VTL The format used to display trace records. + Trace records are informational messages stored in the journal that assist in Auditing. + For example a trace message is recorded whenever the broker is restarted or when the + long term store is checkpointed. + Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.message} + +Options to control the selection of records displayed: + --where=VALUE The where clause used to control the records selected + for display. It can select on all the fields available in the velocity context. + example: --where="type='ActiveMQTextMessage' and location.dataFileId > 2" + +Other Options: + --help Show this help screen. + +Example: + + java org.apache.activemq.store.amq.AMQJournalTool /path/to/archive /path/to/journal + + \ No newline at end of file diff --git a/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java b/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java new file mode 100644 index 0000000000..3bb6491932 --- /dev/null +++ b/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.amq.reader; + +import java.io.File; +import java.util.Set; + +import javax.jms.Message; +import junit.framework.TestCase; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; + + +public class AMQReaderTest extends TestCase { + + public void testIterateArchive() throws Exception{ + String resourceName = getClass().getPackage().getName() + File.separator + "data"; + resourceName = resourceName.replace('.', File.separatorChar); + Resource resource = new ClassPathResource(resourceName); + AMQReader reader = new AMQReader(resource.getFile()); + for (Message m:reader) { + assertNotNull(m); + } + } + + public void xtestIterateFile() throws Exception{ + String resourceName = getClass().getPackage().getName() + File.separator + "data"; + resourceName = resourceName.replace('.', File.separatorChar); + Resource resource = new ClassPathResource(resourceName); + Set files = AMQReader.listDataFiles(resource.getFile()); + assertNotNull(files); + assertTrue(files.size() >0); + for (File file: files) { + System.err.println("READING " + file); + AMQReader reader = new AMQReader(file); + for (Message m:reader) { + assertNotNull(m); + } + } + } + +} diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1 new file mode 100644 index 0000000000000000000000000000000000000000..bbae0c2e84822f94c97d41b13ced46f6b8c9fd2d GIT binary patch literal 64331 zcmeI5cYGAb702&vi(Z5TqQlXvPC8X1AhCmlO(G-=3M6(27UGN)3=%4&ke=8nPV5w? z7pM1LonD;c^u+1C_uhNR-R%4xnH{Y&rv%^3$3OkS%#MV1=k4nE{k^xdd#O~aGo2dm zYhU-?!tna$bZT{bM`uS*M|-BHe=O6U@5|&GGxb~Aa!qZy=FBC#dom66^{pN1$Lr3F z_p4Hc-&NcRc2wA7^y6I^G5=Sz>y3q7uie|RqkDhHUZ#7cUD!`o8i&$Z*s43D zj-;aQc8;WC`hvnwR;1IX3kS5Tz3uS8(c`^++5W-f`P!CTYp%XFm)p|X+}PUC+*03o zVQoWOgPsq5X*!)xr*ebCN3*>{y?y=pY;WJ_z^VN4li8vC@KxE1cVD(|bZ|J|J~rHY z`09Mm;Lt!{Pkv;y5U~^g!%kxdF(bB}x}Y$SuDR{jBelgdQ`kzMcm4$xlO|7@I&J!l znX_hB&Z(-NJFjN`f`y9~FIl>5d1l4RRjb#mUAKP2#!Z`Rvvu{khQ_Ammevcmv~AtC z{VsRC=;FKWXz$p0$)&sQevi)GUEO;w+q)w}Nai4?tJ#=O7;l3mJtB&>$ z-0#@c$4?9nog5w+9XoaZ2R!gWryu-~hd%7#k9g#x9(~QVk9q9l9)H~vo_PHYPkQoG zo_gccp8kwyKI_@fdG7O`f71(I_@WoTeHY3?B{O#{1?9Xr7wTwt6%&2H@^9; zZ-3{z-~0X#e)yvw|Kz7X`}r?^`Kw?5=C{B5{U84Lr$7JYuYdddKmPfzGn_xq|DF2! z*8&l1ltg4PcrN6!QUCHgR4%QZ#tvd;Y@2dPxg;)#%d>U?l4!OQlgmk=y36zQ>}Ms) zWfR>c-6h@ioj$uTWfEITE}O9B>MkdTDwpT$na;cIry2d|Nx!=N-98rQVi{Oe){age_r9*b=s9N%0@O z@nGu}sgi^bLZ@ojo{-AACmIuJ6p&^KprqqWswttSg+(A0zve_ukxJ2*kZKD_MN*Me zBo&mwXcej2Fcc}iSEO=Me|6jr2}vo{KMRrNOh{1utI}u6mieuedCELxo-z+DH(Dk0 zTQTG+ms3M^#T9y{)w;FiavSB6a!I+QTtX9#R>|cy47tkXv{2=8lAh^Y-P&@wopMRJ zq+C)ip@~MThEgor-y3nC+o9@^-Q+0zlg@3#-7HW#@^I-y`^M+2fpQrvA2x0KEC*( zy-M-(((!T8sMB;@NmWY4-r8|FZ*ZbwKOa{fE;?DqvXi7DsYoi43d&%#ic~u>Y8M7Q zA(fR2=;K4V4C@Z71W?j(zF9IeRK=gFgRxMzw&HhE@u~P!d@4R<-Ds8K@5WH1_+H&C zaZ-O*V_(WDC~L>%EJ0xv)FR3}Uw6sZee!juiLZNwl$-51GVkz6(yUNj@iaaA#ffs+ zMORE$Ojk@-Z0aQRqWA*wE_}2lFK2 z((y6$PSf!u^htlAsNLbI;~I&$bbJiG({wxuebQwDal~VE^CjZaakj>DLM?{i12{8U z3hdf3tubrItR1s<4Et-g-dkfQ_4kBSNmNDs0*}!x5I{*MB$HrF$Mev~pJ!Pp5toj$ zNhsx^9_z9#=W1^RVMU@z=wbbg^)uGbSU)q}0raAH$D{|}iWJ|ghkBC03*l2wGkQWQ ziwruhC^+GfoJ9gC={S?BD%5lqpKrAiR~{}pxzn?sq#~(EDv}DyV6-Zo-H#y;N#&&e zo{%bus`00x77L)H<4mej9tpEzSI(RYL0F|PHoNe8fTSX+NGg&FI5JvAsskAEkW^0U z?>VXB527s*KuIU0!mBYIPeN}&Szpq)F^-2?mP*8><74QZrsGNIlm0^E3*?tc#HHix z&#FVMB;X@YR_p80_Gg#VpV6PupV6Pegc+^!XIEe-Ab)14zbB+hqAKnrV`2Yt0hDw? zG6}|XROlUEiZeGXX%c7>XcA}=-~^0TnS?xs z0x}7P`g=mEB$E(dy|hXIC7qB=f-xOWLZ4(3;x{H&OT?w)W9XfxqeAac^DgDxbJitX z&I$-_uB=ZqH%Dl0Xl`h3Xl~$OjCMS80|oSIOJI92*O+@M>#h|^D>L!U#RL!V>#8*C__ z(~qr4@q>6`+)#hlbhebYhgpa$XF7}fS{oB(etyx-iw{}8ujHDu|NGg&FI5JvAs$&@Pgj7i-sCe~d zg8)i8K9b65IYL{gm~sYoi4ilhRLj8>8A1cp2zm8G!y_~Hem zjRGj?_(&?J>9~?=L8v(uKB=WvpG-Ta8YHPmDw2w%0*;JUk!lD-o{%aj35gewHVL4l z<0Gk@rsGNIlc*Zch&D^ar4xnTVMRqL?<2B~r<`dami=ZEVSAFWC2R>>!WKj|S_Rva z7>X3%s}&WN+UjF9)ZY_QB~dk=5!DKyq~lDgg`uW}_!P-HeFE*YaG0bbsYoi43OF)a zMXC`Dc}OZJ^;gI3kdTz}`Zf!ZRd+)vXf-O6_cj9Dto_Q;W%sYJib7v= z-q6sLsXm{7;{SM4x`=8N6UtqbuTn{ezxMupV`lhDF0|y8|EQxYRX*m}eIJh{{aIB4IYv`blC6_BS(!s^|aH^IPivj-9)9@45HB`yY7lp}h}3 z^5|px9)IG=r=EW1+2@|$|H6wez5L3nuf6`pn{U1S&b#lu|G|eJef-I%pMCztmtTGT z&9~ou|HF?z{rtkYJUnwk4iZF%RWdm}^)3Ide%j*b_Bc~pqAqdD3 zmkk?--Y6Y0^aduOfeVwMy>Vo>oI8`Cy(xWIbtsm1E@dWRCQJfM0!-2oZxYJ{?%yq8 z5@vFbDK2Zfz-4(dRcK9Boia_L$QoFN%VOXXxCAbNOJ<@H1ul!Z%M_OlUEoqPdKKv| z!}Dk1f61jGjK{2wcwQSElZ=kqcZ}RgBl8Oj9Ys zU1B{5*27>ukiH&@k*LPVa9PgeAUBXP@`~{bGgqHm-SuX(kge!i)K4e#(FV6 z5AG7~67CZ2l9{MPfy;T^Ws1utE^uj;yKF$2cBBYgV(t=imzcYBHFr7REiT>c>aogQ z@~)mvj<}o;cL{e1cL{gNOjM$9m#%T?W|_KGxa4K(oq_i@pA@1{Qoazl zT);0)i}5J07UK;m(=HVCVtgUoCEO+4CEO)5QHcVV3%Sb_m(gCuWh2V8D@7fbi-1eu z61W5|nTbjixLm|trnq#oOx>ztoR_I*0GBAnQH-M)kE9rPjmxH9b(f6^lih$z;1ak5 zE+fI^Vz=(n%`$bX+$Aql@9v1p#VE#6jH4JwG0sd>qA12)J`=cvyM()h zyNtwLE@6V2&Rx1$rf!wHrb^Yqt2I?L zeTcpJ&f}X-K8I@slnP1(rGip18I&k$sukR2ic2?}GgwyGLs#g5e!Mv&i!$bmDm&0` zC2$E`0++xgGf{~Gmn*r;5@W9smaeZlMLxYWN&0!&C^hy5CtkZ*Fs)rrMNZ}Lg6Tdq z7$80#C5A_d;Zb6)9woL)CH@Wx5$+c;d*G0gxI8H`-1+$#u_eF5Kha<8_r+vorsw95 zE6B-<8(&xuicJXw1M#teKw2<4F_@5?5}%kFn~<*2maa*mza>iLG-tz4KaoMnO=#A2`eCW#`Xcv?tE*r-FEXJ!7m`&;a>q}9h zRZd{8LPd^>92Ge#aw3EpL`CjJf2Ppi5~XtTXZ(N*AhTdS_&&^gg?K6+85I!{JFK=b* z=jbcfz*oXo!dJpqs)mwz5njTzhJR#*J|IK3=MW6e!uSQK^j5v1I6-RF(Gt z@=-M%<%r9MjYDsgj;Tp#>D9@nX7u;dpQ1kbv=$}-CIKb^CV`!R62;`xTJEw$shm`m z*Lm|PL7k*P$%c(eWt5KPL(g9b@`)pz<%r8h>ChWgoeZF;i|smy zEyNaL3$bNIRiY5vb=*}-d{3;zQRr`4Q?>GHP1T$}p@9^2%&!OLfq7sam}izNQDA;O zcUfZWou>U=Ru}2#Wy8jr$|xPnhn}}`b){RSs~mCJFs`ZGZ2z^Yri!NI2T|0; zb_2u~Vhgc_*b*VsAjEbf_gPp|IVqJwf76<(wO4DZ7W4@Xrl@0n6EF|V1M|QIkq0+v4z+|Y>5zRIGEU4 z^|ihc{Vh={CspOGYiLq^i7cmye*#lmaCiHkPwS>4>2>SmNE*tF>TDy5WZL zUkv}T$cOoCg~CE%p|DU`CW8`1Ex47tEKw>aRpp((eEMe(DNwRuqf!~AW697vscID6 zB7BlyPdVbUVdKynrDMs^JKZL{;+2oa>Lo{9Hf$VvqjXIDnVY>C+8bBo3<&Sd7*2mb z`ds}s_%rx3_%rx3HepH>{%jj}S)x=bTquTmqNCC2+}1RHDG;ZtgP0WqYsUvMpsgnxc-&J-{V!30wk~%tR#$T<+m6OXfkG VzFpug(|inHUpf6D8{v77e*hT@P3!;w literal 0 HcmV?d00001 diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3 new file mode 100644 index 0000000000000000000000000000000000000000..cff5040f1f3e530de5f2590929f58b88c2ff8d4e GIT binary patch literal 64638 zcmeI5ca#;y62|XU78R5v$wH95>^Byx*~`K z5l|386a%2|J>Sx|s`m7>b50NMo!Op4@y}ONoO8RoX72avo|*0I^M!=_egE_?b!dvu z7gD!*)&E8K{j>eP_5=DQW#r`)q-TcZ78V3T<09iDBSIr16XRo}XSy+-2mR^*enHva8&n(NE7APnVD+-ikhxHkpoLW{`66jZ6lAe(tNGmML z%S;QDmic`CssBTx(21^~V=bNX)v8MPn^vyQTI{RpXw{Mbw9{*a)IQ_Pv+C46`<#09 z8#HWmZsR6Rn>9c0{0lC;=;9VFTeWV}wq5%U9XoaI(ls z=;22meQe?5Pdxe5)6YEn-19HI_|nU-y!zVfZ@l@|+wZ*l-lF$E_;B%(kCrZ5zM|se zm7lCy{pn|G)~;K>VdLkOUu^nv^Omo+e!Xq`j-B7^`gZr8z2AMmZ~qSmemr>Sr^7%0 za^%vEzUK^{;>b=WiZApZ`<(`Uiush_HwynCyST5q*o$*)SbcVVAxIF(+qt z{-)KR_Wv-x*6?uyPpIg(hRsKM)#gKCwBtbO43W(*Wt(T4XUv`QmX|43xSJ z=wMW;I0f3U?c@NqO6^YW>T|ZHQW}Lpw*wsyiq~cJnra!tmSM}VW!Qp4$Rc37409e@ zQ%UY`2&wE$x+T$NTPXQZRH`@y+LMGR2a^P)KWFqd!Pm)uh^eI$v3bdEA(GFm%)XojqDviRR!$Bv3s<2(juw~dXY#FxT z5V8o^uEJc<@r`YPe|qClHeF z97!rjwGLATl1j_{4Iz~sRq;u38q_(B07Zc|sbalKsz?}oCa6lP^-L-z6_bidg*cK_ zkZL`q3?!A7`-^^SJg4g7z~+^P-MDq8Gzw>?184@QYV&K@=Go@i=Go@a%O(A%1xbzn zZHC<<1kG+Z3}HCU=PB^gb1G>#hOepc#hVdO`Vmxd3bfZ$aSmXsTncW$R%sLljrlA% zR$;q=Vau>(*fMOvA!HG--H164t*IpUH-uDZ#%RK*9ihG>Q3+5KcxjR#4M%r5+tmSK zm6fF%gjE`alZB%Vz%icG>c4)hWWq9GnXpV)a0pohVJk7GYnSm3>{7vw8+NHQ3L``t z&4y#uE;q4VvR$%WvR$GlN-FGfGo}om*mr_{jzXpRJOy5w*h|CFB~_>cQXw1=3a)Gw zsWb{F3rCv+$JnXriT!3K6_bid#iRml=Dkc?^3UMTVWEGx?;rlw$an%21tyY8YdE^3N^(Fd zqt;Y$a1!%D)iu>_CKZ#4NyVf>97!rjwFgs%kVgjE`alZVfDi{V%$ z>>(yB6P5|fghh->DhPWBQ-+YrZaIN@+(tZ$07Zd`q{42aH=|-+49Aet+i}Lazifai zPJ#Avwub}QVvVq7uI$C7Gz#C3M9>mY>{Rt$T!$I93|odR!xo7usetWaOa&d^s7Hia z?r#XG>`WTRfLr@!6QC&2CRMZpQenFY1y@L=Gzu3NzR^<($JnWAQXOGZF{zkTOe(~Y zq=HmOFl8XAwA|kiQrVfbb!*=o0u%)%E@!odV@T=kE)(lsmAO=L3bZE)FtbTrqIa;2}eYC8wmO z4IeRb)aWr|$BmybaZ-9lW>z3OCpT~Ml>CCJg+;|BrDf&Qrq7r;YxbPE^X6Z9)z#Nr zd)@Un+<4Q?x7>Q$?RVUH*WLFlxc9#MA9(PghaY+Lv4xL6@#IraKlAK!&%f~EOE16j z>T9pR@#b4^zw_>Ui{Ag>!^KNJTDolcii(d{ezI!yr=P7^yKeo4jh|P3vFXdrTfW-* z^|tLhc7C(#+ueKie)s*p{XZP|@!+AK4*&eikzarN{g0!^j{o@=lZr`o%3uFTBUuHh zLNI3tsqDh^aQZcERREQ%<1*z&|%0N<~!J)CCO71WEt?`_yw*#A3 z5b1`^D~-aL=?*#z6su6ZAgRwb&o<9C&o+-O2}hGUgf4VhF-Dkc?^3UMTDdwhUW_EfQ5y z0oz8H3Oc?go>NKgZwRUEsA}CXv6uiwfi|i7dX-c?;Ub$0s*AiQ5-tE0%FIccE z_O5_n?_Cj-&Am6yo2+LRIq%ILKmWP2DH#U%-S%1XoW>B$+%N%6_a%`?(cGg8vhlTw?-r(~wm-{L)*Ge zM|i+M#|icO4;Q6QWRM*h(J7}^#feazO&h~?or;cD9EJASvsT<*d+)RFezo^M;J||p zKIG8D4nLyKk#&zc`j~ph)<5p}6HYv-!O0C9H9qCk)0)I5Bqk-Nq&7`U&uG>>vqj5R zt=nW}x6R4TYuCO*e#cIoyL2t+R@l9$N6*vGIP3hkg zmzDM_>wkIqfPsSs54mFKmBX&OdiaQuqehPzJ8t}hiIXNznL2IyjG41$&zU=K{(^9XZ3R<2sTX6?Fbue<(+8*jS#mRoPT{f;~Dy8E7c*WY*l0}np*@FR~t_V|V; zo_y-*jn6#$-19HI_|nU-Y^z$#j{`UJHfA08eC&rKY|6kw#1c(Sjh@i81l|*#bGuhOC>*CUx+jQ4( z7phbJ*LDZ^lIYJ>aMW?C`n}=b`n^157yG>%Ae~Ho9mF=|>@{FeuK}4!(x4#(!u85< zT-O^YY>p3wO(8$=d`3lKwTn_G+K?R;(Wz>Mtp|mL!Xm=>-$z&khzR2n5_tuG;CBVqv4n;RZAaAWmPWNj3#X~S(eXgrK%65f>J@Lpj1Q%)woJkpU+vM zR2pHae~kPgbUZ`^N*XpQmCR$`8SFjaK~sUO2K5f74ZY{SYM&PB}(O$gtR38UddJjN*Y1Rsg&VZ;(ERvSpV2IC6^Cl zD;}4IjjlHujwP=5y6an!%dcW99+!rVt~VNv>w1H@GS5d`sa-f4l?3ggG+EhX$CKsX zQ+3T7aV1vjVx=-xY7-zLj2l-r=98wlY~ur$nnXqem)b>Xgj_YBK*u^Rn;;EG8jds^ zX*e@cu`Vu~@F~j!`_|OYtJz9CFAd|uYdasr*J9OZ5MR3}4Z1DaiDY%+$AkDFK8O$E z6CqUN5nvic2SzFT(Z;1>VbVKFb~WF^T51-9`l?u0Z21wnjY{POK860KIaPj8bE-U=v>Be5PY33Kd0-xxSMbgUF6R09 zvT;=GXRl6;)6pnFX&0sOW|5sqmP7UG_m(q&OW+c?1TKjXs&R425hg{&)|Q~MsPDWn z@faz8NW+-;9fKMmw<9OaBC8X>8Hf+!gZLmm5kfUJ5kC+w;$_h|T>6{VLHuk;q{Y~= z4D7RMw6n(ksB(H6OT*7 zIH&3q)SRk4-G_5L#kLi4D&$nisgP3iLwPK4iBHp5Pya<#u zjFrmIHe6cJ63euZx8a)WsZ_0@R8T4?6_iSqDCR|-O&i1fdYX+&Wki3|>R;!eCL|qb zy5{k;@nqJjXt>cv8(Ryz+MZase6%|iBFJtUK%zkmCJn73&8PctK3V-yTU&@N#1>);u_Z#N#uZy8sf|iyM1M<^%1c!v zHzuAW0woP&rSh}qrPiNVR{wa<%LP!X&0Ds9vTggPSQLUqAy^dRYf*^JN~O`3K&Q~( zwEEXIs0m3&8uda?%;z8>K|+Fr1PKWdLN#te!kbf3LSpSpJsi{WKJJqxo|lGkVDD!q zF->Y>8QAk=UE@joTo50`2k}9C74^)E%E$T)ksy_z6#83Ug5m{!ypQ`75h!Wc=z2}) z=$=uAW4Y^j#R_kzJXJg{4dWo8AgF0!Cz`lL{G%sZz1Kq?f&>H!2oew^Fc}o z?Yb2Qe)hw$3YRQsOK4;=Qduh99=HT9flJ_0VILp3xa1eWjf)7~f?7oAOygb(Tw)0k zmQY~{ndcHZ2Bd0t4VQ&M#bp;7{W4Enc0hEA=n~N-qDz&N%!|qgv-~!%adhctBPFYn zagL*x^YGazl8ghFe*l-jC2+}PP^=qW`o?AVphlNnY49t^>a|PY61dy}TmqL0Ci$QS zTsqb+^ycM>{I%q1vUE!t#sP9sP>EkaQ@7HS`1zo&=zyM8uX@whaqTyL<3 z&d+wET7P1>QJad?L<~Wgiww3S&2ql>c2vNSE&pz-y^8Sr*1Ur z)ns+dcSi9E#U~V>fO(bg%!|s!EWCi25)x}4H}F!FIn?<%635%IsQ#h)rwEiBxJt#fB^#B> zi2jx<6{i#Yk(&7;P|`3~DnA=zYbqbhoQgNbUgxP)g~+LpQz559P9^H}&xWg19PS25 vsTBHKqEwnPkf(V|SN=%N0ud-_*ywtr;aK8&uYiX2kv|-)`k`)0+*0xM0JNbV&D0QNJ1kt}mWVutBPVEiZu7?lht26Cw?Vqx* zdUdDHU0(Cr*S-FzH@xvpN5A%^0)KJ(enef|qyJo%I__3m@(X{Yx+*md0aQzK8-gNUV3l}Y3vUJ(< z6)RV*UbA-H`VAX5ZQinV+x8thckSM@ci;X42X8%e+wDL6(T{)f)1UqP@GpM(t6%@- zx4--SAO85KKmX;gf4k$(yY9Z{-uv!<;K7F;e&o@|9)IG=r=EW1+2@{r;qU+W=fD1q z@#FlzqOX4vh?pUXpw>PeikR6fBI5k#NG>y1op5Awm+kt}ANzj@-x>7huI{jfe((El z{a#Rcg#At+(%q>~+z+Mo6V{7>X;4^j!KpR{$kWFsX(l zAeHJ*F-WBjN`tN;yM(MJ)gX`xqynixDpCm7M5G$TZ3ao@6Fx`%qkab@Bm)x2yh3C# zWL_PV&dgb4my*>oKNvC(nTO0n=2^=f8_E1&ZZa~l_laE}>gTm=Wj-$sRnbgU|AK}S zN~)m=NTm>23{t6s(#hhicNu-wq#6oRfm9$BNJR?anut_Gxy?AKPNjZc$5sF(PMS#N z3@4OS0~3%+A+i{xQU|4z)sO6QvYJ%GKq`<5qyni(AzTxYY8bZ}A(d}h&IP1-)Zcjm zC~+*3Drh*Nq#BlhRO-ORAeA~Ooh+`;uAtAFRKr0kkP4&%sTfC&jYu_|n~al+r{(jh zzw-r9;-raG&Tx!`-j@-bMj>CoRw6Er#n1;0$4KaXh(DV`emz@>xHuL=A2b}}q3=&2 zzk#hpT%0t8-WiUO(EEO&r&GvpWGfLD$71M%hGQi3zF+773i(ZJCF0^(41LgWjD+5I zne?TQ-^^AbE{?^}2Mxza=zW*TKnnRSY$f92SPXs8aH7x$Y=WvzPzF)wyclH=bx=B4 zJV#ndpLH2z1j-;NgP;t8G6*SzYoap92yQb%DxcW#J%bCWzY7IW;uJ^~p!mZRD852X zF%(}Nlm^W^)>hGHt@xv%_)vT(J`|s2-LaA4kLIR=;-_NIsH6U&6Z@J3GOw^x44GF4 zr8C1NtJU;b%X|(p51EI|L*`k_9UIAfj+;W|vNo-9$)(#hUb$?5TtY4(myk==M8`&Q z*}zRk>L9)nKsEJ~9~!qv${*qc*Fi=kP<+*&VoZ?LL22+q$gU-;C&*)=_)vT(J`|r6 z!ZnfNkL5Olbr7HMRa2iM^*2H)UloI&NWl+{TP%PQCzwazDP~xO%7nCy`Bcb z5F6;TCeO23b4vKz^2*fs*TfGuDP*fLQa8^N}bn+l4biW>wR^$*Rd zMyEBW;%66a^2+>V$UI~oG7p(&Eq81r^OL#B$i&{4gv4)pT_y8*anh7iIm0m$dS5yb zzY%SUV(5k4bA0ho8K(#Vhk|ka@^FWF9ilTJG3L=G(Z*$i!Yvq^W=L{i^F_J}-{NoGNHI zMndnSYJ3CM28p;h!8uiu*Kn%Xl`^N|&F)+2dek}9RKOOn1#AIZCaPm2*iPf76y;Qo z`WqpYA_K*>nk?^L+9-e$Czw=qX-x}9QV_P0)ufsRQh`(;6-Y%2;TrR#3R3@wQ~}k$ zB=4|P2QFno!aK#cdu9GS$UI~oG7p(|^_}&i>)}KB>HL+EiM_7`72mJAN#^t7Sgig9 z4aZ3672N33s3FUn3ECF|yR=TpwN{mjV=?qW!wH=v)TcEU97QK@2U(p9PRAqxlLSl> zFi9YVa7~m8re%^4-w&}x040t^Qu!tc&Tv9WmE_G-3X!EuXL(oqPA{oufK(tANCi^4 zh+@6yNM~p8SH^Ry_-2Bw0w{4Tl1c%C!tD&lNa%f3jjxVvlZcB`2z@~9IG5J6kl*08 zi+?=)yXn;KM036i09(Koumx;c85~OzL2U)aPsI%aj`|xRm5-|N1-|V9C~<;G)sR+F zjiGC7H(6c4oC#8aR3H^dMGE1Xh*UGV%^<0Q)ZYlHd^uHom1&0nN}OO)C3&Tsid`uS zn0(X!9+2veJMX&to_p`R|A7Y|diaq?AA9_XC!c!ynP;DS{slZx9S>B;1J#*Et~F1p zAoY((6;L`Go7RM+j?UU%vN|D|g@gnN2@(<{B(A=*UUaNe&El_&OzhP}n)(-Cl;0`y zd2!OTPUQ?IlvGLH9IE12%7la;hO&>ZJih2;k!LnY1yX@jAQdZvW21y*Ha8g|l`jd2 zuW0WQK#60KR6)Zr9{Tta&2EXfIKkDwBrmBl1zWyLen0(QbXs^JU<=p+wty`Y)v*z5 zFXE=8hT6HKanS__!<6pRC8K`N|MVVw@^lzN?(Y2;e- zqzY1hBc$@>e{~ddUS--VfD$K|R7qY^rD9jg0w&+|e~^NqNp&%DD&$nisgP5-h+@6y zSZce3ze-U~RY!e})ZYlH6d95C3Zw$5 zKq^uQ*F-th9Bwm6Dy6p6S4aJglPbPMvtIxuPB5vGJRDfXu9T$W8{iJn3Dl&T15$xh zAQec(IC5-iN#&@&5mNc8fAth{UZOc5fD$K|RO8aRPL-n|+(uTD3ON<-r^5YIxSz@a zlzcExsvz|@LMoNRP?+OOGzSGx;slc_$wPKke@dBC@$JvIdr5T}aw_Ch$f=N137!7a SiAZ%hx1^~0=cs>szh4A?%_;i; literal 0 HcmV?d00001 diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6 new file mode 100644 index 0000000000000000000000000000000000000000..a3b776e833117dd591d5fca82ff20f645d39b7e1 GIT binary patch literal 65397 zcmeI5cXU)m6vlTI?7d*`y%0%|YDK}WNU>6ru<6OBBG`Kc6hRO{x>A&GK@=>g1nIr^ zjs<%|)Hr+Z^~@XgoY@?WGsn+=zIlWC_TGK_`@VV0> zXnq=NIJ8$uSz1ZMS~3VVQLl0$|0ncslEg~rd0{0>sPXsz?H?~E_PU*IeQPScoK-q5 z1D{&2mE%b3Q>e9k>9VHL{B&07xC~ruy;V7mv_2ki+EB|sW0j7}z{S>Em7`i;{Q|zq zs6rA2%IfvaEp->`tHBL%3q?67dl}f8sOoVI9FOPHyRm@c!h4a3D9evEP#~pvdi6@4I9xDYY-GQ@39I1`RK|_>xO6yS&jAS2k{P)z#Nrd)@Un+;~&do0~On(ejpCZ@c}D zJ6pAG)3#mv)U@;tk&Mi&?3~=Z{DNpo5+d;Ezf zpL+V4E?v7l`&{?ud-QyvSML}5ywtbf%l!uo95i^y&{tj^Hhjd$*Ipkrdd%2y<0riF z=38%1d}q?T?@gZa{s$j^H1*?8KAl!R{j(V}XU(27ci#L33l}Y3vUJ(<6)RV*UbA-H z`VAX5ZQinV+xE}D`0}eAUw`xMci;c;<4-^T^6SpucK!Yb>c{*4zrOwn5CI`X(5x*k z(-Od?!Re~NrEyX!ur29KQWKY5flJ^LcJaTrO9Dhd6fV2*q$9!JHXF{6JTHS-u(t@v zSqTu|cs*4?eB-3_CUqd4MQRfNIS?Pj2k}9C#=548_}zIbMtrAkyv75cc4xp5r?QRZ zGsU1};L|NAZI$6j>*HM}L4TYj9hZTNt+y&iwcaA!WcwO!BE+THq-MD3fpCLxgK&dz z!wE)H6>fU+lr1iEe8pu3RXT^%#AQ$561W5|flFqhrj^5GuCKVvq~hns#pMgYC2$E` z0+$*l`9R^a7auu(A0)Vxo-MaOWZ=_%5Um`?TOW+&bEM-k@Tv7$IgYnJXz_EU<1&b~ z-r_D=p07D4ix4%B)La(!M$SRbLC!(W;V`4A$~nDxDn@)Sz9Q7^eSxpU&!IPK0V#-&%`w;jv#FV-eln_z8%wTW&~%!|Kv zdiUi&^5_R zLIOepLINiNO;t!3#8ZwtRSnnUq{a;g!0WQ)u61aN6QY^o|ALuj0oDr`r(iqs7D!+}fS61W5|nTeVz zTn^_cTU>Td0GCEeuL@imC#4F@NY{{>xJ0_#30wk~z$G(L)5@jG4hi7W2zph4OXH+e zbbHdZq$Vy$B3%NPz$I`=gwRFdawN~$rpvn%z@-uNssfkBNvXn)r0YmcT)qxm0++xg zaLG*6RN-RG)_ttrjo8FHE}r#xCAbNOW=~3sA=VJnc*ug)2R3j zad9~sxCAbNOW;z&Bp)bTj^-oVbeZWZF4L*fjihF}90Obem%t@($xPH#;c^U5+2S(G zS6p_WN;i=Lmw2Wbo@s_>nkDp1vxIS(?JF)Lgvrgo<@ya9H*Mas6}SX0fy;jvmpQ)T zGJ`O=1-Jw*VHdE=xN(615fHT)AIp=ri}74vahXY#ZY4DrzB~$UUDD=`7>6&!-+bl}3NZohrDW zXp0z>46L21$XBPzr;F^XxK1?}yifsmigY!kP+B1*c*=($rh0(&+EFQw2BUY!`!)fwfbW`s!4LblrUu*Qut% cso+#_DmazqP<)`AYC0czw5HPNU-7#C1mNT1^Z)<= literal 0 HcmV?d00001 diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7 new file mode 100644 index 0000000000000000000000000000000000000000..9488e64d83e23f4ee4acaaf8780d92127fb43fe4 GIT binary patch literal 64833 zcmeI5cXSoS7suZKDG+*z*gMvQ6d({Q3Mwih#a@&60s%ro8YPqjLN7M#1$*xmd%=Pz zprC+?ic4MD89FQ?{H=&d(JHJ{NBvr=AS!nC~xQP=KJ~1o!!|$ATXZ=>iX&5 zr(dXhZVC&sKyqHez{HIFjI2Sy#Eh(>oRVPf@WlLJL3U#I-hKNQT-b8h$Z&hOFlf(tL| zb@3&a_P(so<$e40A8^H$S6zM0wbxyL!;LrHoROJzOE7!jpq#-&hUN~-%O74)SX5jx zV&tgNrDMj98(&sFVPeIk$y26In?7Uatl78TcKaQ7&bjOEd+xpO{s$g>=;67KJo?z< zPdxe5)6YEn-19HI_|nU-y!zVfZ@l@|+w|EkV!h| zO(F<{6daSV7&|R834;0?PO4nU?-q<4P`sf{su8x5st^Xh1ENZ*B}6JB6_JWa1qvYs zhg3_j&mvL@>hB!4M%8kptukK(XQ&b)$vjmpscK7AYqe^RQZB~2WZu~7-!M4#tr)qV z=M9^xf5LDKpHt;Q%Kyg5GtL_}rCu10;i=Dul>dW~XPh@|O1&@~!&5J>32);W=MC*j z!YErUW-(lMJ0YqTb1794sFFaH1ga!}LWse!n9H!wReUQJ#s&2^oK*6{U?s2hTWf*pzxV{L zSCX$JUy0^Xh>ovZj$OKPS!%0Xj)0L?LnOJRl^$B@p_LwAD?PsDa*VBVITB>D##1g= zkh>&zN$!%|B`Aa#9Cx__`*hvqSXl5Ei(M9( z1VQ}`CzZT9R?7j!8``9rU@NJ{!9}(YqDrcDL@FW`k%~x#I1-{ms&&|95vc_AH=I=R z>ey}$DBjQ})kIrKHQv3*IG}igFA=UMQW2?$R75Jokq{kHt;a5lNF}Jh;iQsR$M$eQ z@rE|3Dr_ZHnR}5jBoxN*i-p|R3~eA%5vhn&L@LCQ5FJud|SC8)pQq>`6t_Hsb+ zhD$0tUNa;N$MDq4HOGBCPd63X4*Z`J^Q&sCGKkxT-a1TqO^63__<(J={s zVwXiGK~R6^xHWb_n{2DhPjJr=YY1mbmHAC1^Ca^m^Ca`A3l0?(eY_!F`BFFtxWAW$iiMG?Qad zcBEcwIEJL&>oSqt6^r&ty+$Qrs;!lT3irBXG1B$9|7PQ@WHHHNlEoy82?`;Gx-6zq zd%S9?ziuVL&&wN>FOFm-0X9}(v7RztNit6|Pclz3FX}t$MR+UkO8g^TxtwOJE1u+z zlr_fT&h% zxrxc7(o%mzNaaOUsou^S*Mn4kUJa(?E|P1su>BQl=1Hm=s-jU9jjCu=MH3N)dJ$fu zHJMae>Td|C6f!`PrKJwmq#mT2Ve6b~ikl499Irfx``3GFiBv=?A{CJel|hKkoT?VP zT*Wu4`lqG-&T(rz8{p?Pgvu93azX-|!m<`{rqpHqy(IG_^Ca^m^Qh%QbYvdiVQR|b zr;5VDAxkUTEY8c=oAvmq)^K!5HPhA!$yE0wvX*c%RZ{IEQW2?$R75I)Q1~HCs%-pV z3aPxxWzvc^Yib9nw1%TgDnBpGRS+4;ITdVQ&RThqDm%RGZMcxe8Z>Ma6&(}XxJg{o zX3bl)Y}L9=+jjAXv_JH)!;d)fsH2ZL_PFCuII+V?C!ccaX{RS7c1%i6N$u3ROIp`% z>1UjI*4gKr+x@)rd-S~E!i#!ce95J~FY9x8-+uiETyf=9S6_4Ob=Ti;<4rebWMXYi1rxx@1EhZhtU6_<<{Icjw2n6cx=mz7VLSTSkxl&RCE&zL!D_N}+we#f12 z?z;P)d+)pdfd?OYc^ztjOzV`YXZ@%^Rym#Jx@BI%x z{OIFPKAr#B=U;re;H!mSf3s-ul5f9Ty6pQOmaka(<4-@Y`sLT(R{u`7Bhc*#bUVTU z+>Q{E2>5YJXA$4fTTxgzJZVLnHM4_MTEj6U^6v39Jg)}WZ=*=bqRgBKG5cgPyX`xZ;R9fn92&oh@K(3`#Cf34>RCW|zh2a>I zdauhwDx0#FUa8kG3A1di9Zz$wL)HeaM{cfhuZL_h3AJPr$Rv3qoC}t@r;D}4(*}{>vzQ%Cb3u}Ml6jJOR0bhBb3uF&gDFddio(Jn zORG$*m6x%%V@@Ru$B;RdSL&sjBWvxIdJWk6c{{ywJduQLxqBV5c5pqabE;T^Ey0#x zORyCniys`=;$Foj7G4##g(DZ#-w;xHRsW>I7i&`wQqA@;EmV%@|3NC{OW}k~cO#33 q&nl^6h*U%>A{CJe6haKflgdl@1obzBR9;jqb90WhJJz;tQvD06!}j6; literal 0 HcmV?d00001 diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8 new file mode 100644 index 0000000000000000000000000000000000000000..ac4cd3bced1942348f484d2496362470115947d7 GIT binary patch literal 65397 zcmeI5cXVGj6~}*qrZj0UV4f-*>1W~D z=N1YBP8{9$e`}SxNENEiT+p_7`O+2b9W~3kR&-W3*R|BuR@c?dZ)s|1sc&koZD_5o zZ>y)@#lKXkPF1Mu>RwvYzOubzS!Ydq$Li%5cCNUfW@TshlA6=cI%nbPuI|nSYr5MP zpVzslYvuBeMV+fw7Yge9|D#Z<#HGfnQBI`_$Mro>71v+42cLBE(}xTlHhjd$iczD-j2$+2;Ed-!@A+rG;Ds+b>&0ikNOX><*jdf`$g|~=eyp0@gdW#cRIw<*$78YhVAyH^24mYrpf|?|uKeAN=q~KmN&2fA;fVT>r~o{rWe*{oU{X z@W(&>`7eL{+uv`v@ur(^x%IZ&@3`}>yYIR8zWX0|@S%qvdGxWzpZLc=|MlvLfz(fWPsQki`)_VZWvPE6qzZ}jM$_vIYgHkDk_5pvDJLPSIf-<=7e3NJ zcOp48N_t!p(N4m;3{HYqpQH1Wj!igw8I8X&)G5A`FbGZpoCG)ta1ux%Y@?loLHsO7 z;jE?py5147;PwnM?;)}jGVc{iZ)UTxYAoe_nI8<9hs;CfA@i)|men#pm>(tL*oVZ< znfOwp#h;f%o?~Li4-0CsOBWn^^tD58Yew2iRg8z4H;nT)u$A& z^$MlGk1LE-6DaS)b|_#A*aEhIEfdwU8n#3Ekx~4lzNiQZpEW6y2&qC;bs{1)UH~PD zXj1iLAeC32Qjp3klm^#UW7R~;`=lBMQh`(;6-dQ6vaBZ6Fn*LEsY1eMsedA*3Q^Sw zC)5N1lq8}_wJ`&!y!w=aR9>MpxYio0CQ;re)o_psqynixD#npzHK~U4qXbD65Vy+&k^o8)(WKgxfmB|7NYa}-lq>YoUyLR59a2~{b8l0-DAc4i=zSD#Xl$}5z{-F6yqQz-9~Y9vSn zQh`(;6)A*mbWBADEG0>*knmaRp9rZU!wEH6040e$ISE#Dq`N+1)Ai;Ig!THd6omB( zr9YJQ#;U25_X%48!h*0MEC_3(mGz<+3(6^uv2wX9gIs!uEQMTpg;H~O7^|jn^U3y~ zS2!AS3Auz^LM~YuEUN<&KJ1jd;qT+Ko{vvVeG1B%Z&%Xby>%ssvDy$P=lo=ERa} zZw69%h%5!Eyh3Tb?WC7Ci}HRlIuIQh9~afa8lcW(P?%38Vt4Kq`>RMilEsarUp0|4Ykdq3r8aA-BBO z)RUJw)C>WXBvK?*RC5yP`VeQFxjQvedR!8w>mxSD?8|FdxQoW$9Qu3l!@^3y7O(|u z0b3@jWp!9s$&ZZU=i)$bOZ^ieRfww2+?|>wfRaQssj}Ri>yfjZVIg0AGM5IMPpT;( z6-WhAfmDnm%W6{bDJyx3sVw!^q>2bgvRvnvl+5$-<<&u%p9-0W%tPiO^On4GM$0@O z^+7UU_SrvAVX3yxQiqx?{=6hoBvn*%Vo9|>uTy8c%}Z1@^fLW`1f;@#D(t7iekx{@ zZIdKbXbQ?%Cs1<)P?AWIR8h@|B~_NY@H|A8bLxzbFP|4A)ilIZh^Y`$A*Qks#d=Zv zqJqx@OFgD?)(O;H0hA<66C!5O4&*g9*<;?Es^vGx?^X6CO^4z`@uB!od{zd_>ez%2 zib>J2_xx+BzNP-Lj(wI3*}Q_w>Dcob+I7@=e+sG!G7p)D%tPiadFPClc|P_rb;sUW z8CKQe&r2dtq_Ub5OR9r;4M_HyA*Wn?_Ma*}1Ed0}Kq`=m;bd7&Dn1V^6;g#(7o3%0 zRU?3sM2e(}YK|sV`S(+~9O1~vS?W;pq{k(ZV%JACCy}lX4HIY1QPoP1OCrUtk7`aL zT^|}I&OCsslOC6Z>H3H{?<_Yddi5#iWC)+-@4nD}7jqU|Ot_eEG2vobt|e!*i^<0$ zrAR7IZK>v#`X@pvSA^L=bLIh5y#PuQDRzBSb9C275cW`BCvoWWi?^>Sf}WKB+E^T9wN&bknmB+O+mS=hFn4}A(xO#OC~v^<&yWoM$6@u zdG#*$n|7;a$mI<;-gNUVx88R99e3V!_dWOCcmD$qKJ@S-k3RPJ6WC9M{Z!ab#R6$t zEtk9*Gg>aQ+}fIyciBU7*#fzQTml!sWzb!)Ws-A8kjr$J^_{gn)gZIQl89FP!+CX( z50K)w1{J>+^9h(wz literal 0 HcmV?d00001 diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9 new file mode 100644 index 0000000000000000000000000000000000000000..88cdf8fdd7a9af6aea26a8f8fff4dc2ea9a57960 GIT binary patch literal 65014 zcmeI5cXSm+6vp2uV8LFncg0E|K?nq-CP0ADyR;;}04gLTp^4J$z0i9??;XV66}w{Z z1u6Dk5i#!GJD%BL&zXsy$L!(DKX-O=lG&Nv_xrv(GjBH*i)|FKJ^gfS+o7y_!o?yk zV#)c1{gSc^vUB^#ld^M*2Mmo59Gp}TFU(77oYlT#aeiSueMn(;&Xw^_`2_=VJH?BN zMU{X5rx+wgidfam^t7A-{RU>|CiTxB7_U}4IW@UPwdCZwskLgRrqrrkqh_6IDQPw7 z&-k1;PWS6YoquA6cWO`Y@90k`_~$)4rJUz&ajq%UBA*Ax@qWFi%TdHC6%ENLN~<9Y zeitfM=RcwUM8WGq#mdK*9nKN|VeHB`1hLrvalvL$=NnL;WKS!URfsB^H|5ohmDQCU ziT(DkP_fbh2Oe~ALwJnHCUj;(Us@h6;k(#fZsdfMq{oO#yS=TtrSyz?)( z@S?bJGT)9Tf~q(Q?*jhi%0&uG@XMP|!Zt+U#+ZP&g-$4;HQbnVu? z$E7`c_3qO*J16(DcwWE$11`Vf%7KIO3kDY!6%QFYZ1{+)MqYi*sL^A_j=T1{>uN?-@T~;-tw_rcRqaW9F>cC8cxb&YQnr;iAP$mM&YqV&$sUYu2t? zzv14E_uc=%gAYCY$fJ)v{=}0{J^jq4XP zRO=JpDWx)^zavT&rm8di;T35dr9jDou~JR5P%3Tg5}{ODqjYagqRuy@V_m80L8+ir zQ0jf&sSqFn!BeVwT;Lvm^#GNsXlegC|Bd2Fu&IqOTs+uOoCl~j?&M|0w+O6a1or_hzEDH0MSBuGe*kPtD{;3-v8u5)|}Y9hhz5{^=!WWhzH3d=H-?l>~^ zVLB7oOduM_5tjwy*}oYUVylf^BE(i}ls+F*=)U96i!Qbq5L<{X#1>*p#887Lwi#UK z6F(9;l|p~hoNB%W<~7+#1m?9y>B^M)QKZwcj``-mJTMQ;1M|#sC3=`|&Q*>W`!G)% z*n1@!N5?l>~^VLB7oOduM`5tjwyoNA_p*lM>zBE(i}ls+F*{V3QLL2R+q zhNU(vwOK8-wNS-25;>JZe@B!m4AO!95TdaZC|QV-Nl?1u$k2zGgc8z6THq5+IWlr&l&;Ji>MhTm=$LN_%meelJTOnhP=km0mR#qEu@B3s0-Fg$Q|aerAxb%w z(j8N!nr)#}+BhacskBDv=HiE;@Wg|zRIQ*?P%0=Dl#1m@iJnrm;wn>IF0{a<=CKoj zORZ7r;aoooPQidnEb?HH2a7ybi#*o2EV00)X7mz)ORZ7DB|j91QyJhAxCAbNODkNq zR@`OG*{`3Lk~gH?wjy0lf5?K1^B`eXSLu%9Q&54uS0Y1>xGcCh^kK58bjR_b53Fd5 zW^%-3!FXAJk%iCEZreoo9Ia78(>(gsN;eljBZW=1u2h4dR8T4?6_kqQ zNQs_O<#UyTN@YZUN0cfI(t$;J(LxH8EEp@*Vhg3x#x4;`r8P=7*L*(;wnb1X%pGIy z7<0!~bH^?#l@a|NQK~RV2lhjVOes*Z;NrxqQFpwdH`t|a^Ik3OnkPbGwMI!{`C%w+ z=vY_S0_1|o1(6FP7bHTc!OH~)bDbkf6$ZP&Rus`v3Y09ks8mMXab)PjR5h?VCR)i6 zmj&Z=c1cvz*#-XRM6{*Pi!Qc>5L<{X#1>-9imF6UYzw){C%#ilghuo?&8ckOtEEZg z-pr}^VJPiFF<%7C1M|Qx)y<)Bg-(ce37gI(%2 zFR|1b-kXa&rT!HX9qCHxm`6gg6PO3)fq7=R68Dgh7~pb6R9*2B!ey6GT#kS%1}=e1 z;8MXPA9%PN!AGX<(&kM)PPt2dC{S16^3yG!ZQZtg2Nq_rFpGs*tA$x>T&|3&yIksH zQgj0@flJ5*tdX8m;88??xDE63duNRUV)Uyk?1TL8jO7sw6jZ2%iPB`T*`MElm(je$7 z<0IiN;Vywo;8MXPACwQ5tE1{Jm;3z|JwtIh3b+I=flJ_$38_R6m!r7KG#R&f{=QT0 zlD}i#3%JBm50-kc)Ki|Np3%0rToYAyxx(+M=pBm7(Qub=mvEPGm&`;ZdhXI1musVn z%aw%7KENg1CEO+4WkTGg&D#T;c^@wnI_}+MirN9{ho?A^;l2FflJI?V(t=imyAR;mJgR3ql(LQgv&hO5*vxI wkq8@!%CnKk+Fjbb-pQ$zaelp1zffFa?h@`2?h@`&b(MVNxyx($s4N-(8@XiK-T(jq literal 0 HcmV?d00001