diff --git a/activemq-optional/pom.xml b/activemq-optional/pom.xml
index 6789c26c68..b26b82e629 100755
--- a/activemq-optional/pom.xml
+++ b/activemq-optional/pom.xml
@@ -44,7 +44,11 @@
${pom.groupId}
activeio-core
-
+
+ ${pom.groupId}
+ activemq-console
+
+
org.springframework
spring
@@ -153,7 +157,22 @@
org.codehaus.jettison
jettison
test
-
+
+
+
+ velocity
+ velocity
+
+
+ net.sf.josql
+ josql
+
+
+ net.sf.josql
+ gentlyweb-utils
+
+
+
diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java
new file mode 100644
index 0000000000..736d012007
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.Velocity;
+import org.apache.velocity.app.VelocityEngine;
+import org.josql.Query;
+
+/**
+ * Allows you to view the contents of a Journal.
+ *
+ * @author Hiram Chirino
+ */
+public class AMQJournalTool {
+
+ private final ArrayList dirs = new ArrayList();
+ private final WireFormat wireFormat = new OpenWireFormat();
+ private final HashMap resources = new HashMap();
+
+ private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
+ private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
+ private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
+ private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
+ private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
+ private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
+ private String where;
+ private VelocityContext context;
+ private VelocityEngine velocity;
+ private boolean help;
+
+ public static void main(String[] args) throws Exception {
+ AMQJournalTool consumerTool = new AMQJournalTool();
+ String[] directories = CommandLineSupport
+ .setOptions(consumerTool, args);
+ if (directories.length < 1) {
+ System.out
+ .println("Please specify the directories with journal data to scan");
+ return;
+ }
+ for (int i = 0; i < directories.length; i++) {
+ consumerTool.getDirs().add(new File(directories[i]));
+ }
+ consumerTool.execute();
+ }
+
+ public void execute() throws Exception {
+
+ if( help ) {
+ showHelp();
+ return;
+ }
+
+ if (getDirs().size() < 1) {
+ System.out.println("");
+ System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
+ System.out.println("");
+ showHelp();
+ return;
+ }
+
+ for (File dir : getDirs()) {
+ if( !dir.exists() ) {
+ System.out.println("");
+ System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
+ System.out.println("");
+ showHelp();
+ return;
+ }
+ if( !dir.isDirectory() ) {
+ System.out.println("");
+ System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
+ System.out.println("");
+ showHelp();
+ return;
+ }
+ }
+
+
+ context = new VelocityContext();
+ List keys = Arrays.asList(context.getKeys());
+
+ for (Iterator iterator = System.getProperties().entrySet()
+ .iterator(); iterator.hasNext();) {
+ Map.Entry kv = (Map.Entry) iterator.next();
+ String name = (String) kv.getKey();
+ String value = (String) kv.getValue();
+
+ if (!keys.contains(name)) {
+ context.put(name, value);
+ }
+ }
+
+ velocity = new VelocityEngine();
+ velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
+ velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
+ velocity.init();
+
+
+ resources.put("message", messageFormat);
+ resources.put("topicAck", topicAckFormat);
+ resources.put("queueAck", queueAckFormat);
+ resources.put("transaction", transactionFormat);
+ resources.put("trace", traceFormat);
+ resources.put("unknown", unknownFormat);
+
+ Query query = null;
+ if (where != null) {
+ query = new Query();
+ query.parse("select * from "+Entry.class.getName()+" where "+where);
+
+ }
+
+ ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
+ manager.start();
+ try {
+ Location curr = manager.getFirstLocation();
+ while (curr != null) {
+
+ ByteSequence data = manager.read(curr);
+ DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+
+ Entry entry = new Entry();
+ entry.setLocation(curr);
+ entry.setRecord(c);
+ entry.setData(data);
+ entry.setQuery(query);
+ process(entry);
+
+ curr = manager.getNextLocation(curr);
+ }
+ } finally {
+ manager.close();
+ }
+ }
+
+ private void showHelp() {
+ InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
+ Scanner scanner = new Scanner(is);
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine();
+ System.out.println(line);
+ }
+ scanner.close(); }
+
+ private void process(Entry entry) throws Exception {
+
+ Location location = entry.getLocation();
+ DataStructure record = entry.getRecord();
+
+ switch (record.getDataStructureType()) {
+ case ActiveMQMessage.DATA_STRUCTURE_TYPE:
+ entry.setType("ActiveMQMessage");
+ entry.setFormater("message");
+ display(entry);
+ break;
+ case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
+ entry.setType("ActiveMQBytesMessage");
+ entry.setFormater("message");
+ display(entry);
+ break;
+ case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
+ entry.setType("ActiveMQBlobMessage");
+ entry.setFormater("message");
+ display(entry);
+ break;
+ case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
+ entry.setType("ActiveMQMapMessage");
+ entry.setFormater("message");
+ display(entry);
+ break;
+ case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
+ entry.setType("ActiveMQObjectMessage");
+ entry.setFormater("message");
+ display(entry);
+ break;
+ case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
+ entry.setType("ActiveMQStreamMessage");
+ entry.setFormater("message");
+ display(entry);
+ break;
+ case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
+ entry.setType("ActiveMQTextMessage");
+ entry.setFormater("message");
+ display(entry);
+ break;
+ case JournalQueueAck.DATA_STRUCTURE_TYPE:
+ entry.setType("Queue Ack");
+ entry.setFormater("queueAck");
+ display(entry);
+ break;
+ case JournalTopicAck.DATA_STRUCTURE_TYPE:
+ entry.setType("Topic Ack");
+ entry.setFormater("topicAck");
+ display(entry);
+ break;
+ case JournalTransaction.DATA_STRUCTURE_TYPE:
+ entry.setType(getType((JournalTransaction) record));
+ entry.setFormater("transaction");
+ display(entry);
+ break;
+ case JournalTrace.DATA_STRUCTURE_TYPE:
+ entry.setType("Trace");
+ entry.setFormater("trace");
+ display(entry);
+ break;
+ default:
+ entry.setType("Unknown");
+ entry.setFormater("unknown");
+ display(entry);
+ break;
+ }
+ }
+
+ private String getType(JournalTransaction record) {
+ switch (record.getType()) {
+ case JournalTransaction.XA_PREPARE:
+ return "XA Prepare";
+ case JournalTransaction.XA_COMMIT:
+ return "XA Commit";
+ case JournalTransaction.XA_ROLLBACK:
+ return "XA Rollback";
+ case JournalTransaction.LOCAL_COMMIT:
+ return "Commit";
+ case JournalTransaction.LOCAL_ROLLBACK:
+ return "Rollback";
+ }
+ return "Unknown Transaction";
+ }
+
+ private void display(Entry entry) throws Exception {
+
+ if (entry.getQuery() != null) {
+ List list = Collections.singletonList(entry);
+ List results = entry.getQuery().execute(list).getResults();
+ if (results.isEmpty()) {
+ return;
+ }
+ }
+
+ CustomResourceLoader.setResources(resources);
+ try {
+
+ context.put("location", entry.getLocation());
+ context.put("record", entry.getRecord());
+ context.put("type", entry.getType());
+ if (entry.getRecord() instanceof ActiveMQMessage) {
+ context.put("body", new MessageBodyFormatter(
+ (ActiveMQMessage) entry.getRecord()));
+ }
+
+ Template template = velocity.getTemplate(entry.getFormater());
+ PrintWriter writer = new PrintWriter(System.out);
+ template.merge(context, writer);
+ writer.println();
+ writer.flush();
+ } finally {
+ CustomResourceLoader.setResources(null);
+ }
+ }
+
+ public void setMessageFormat(String messageFormat) {
+ this.messageFormat = messageFormat;
+ }
+
+ public void setTopicAckFormat(String ackFormat) {
+ this.topicAckFormat = ackFormat;
+ }
+
+ public void setTransactionFormat(String transactionFormat) {
+ this.transactionFormat = transactionFormat;
+ }
+
+ public void setTraceFormat(String traceFormat) {
+ this.traceFormat = traceFormat;
+ }
+
+ public void setUnknownFormat(String unknownFormat) {
+ this.unknownFormat = unknownFormat;
+ }
+
+ public void setQueueAckFormat(String queueAckFormat) {
+ this.queueAckFormat = queueAckFormat;
+ }
+
+ public String getQuery() {
+ return where;
+ }
+
+ public void setWhere(String query) {
+ this.where = query;
+ }
+
+ public boolean isHelp() {
+ return help;
+ }
+
+ public void setHelp(boolean help) {
+ this.help = help;
+ }
+
+ /**
+ * @return the dirs
+ */
+ public ArrayList getDirs() {
+ return dirs;
+ }
+
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java
new file mode 100644
index 0000000000..e3fb5a2609
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.activemq.console.CommandContext;
+import org.apache.activemq.console.command.Command;
+
+public class AMQJournalToolCommand implements Command {
+
+ private CommandContext context;
+
+ public void execute(List tokens) throws Exception {
+ AMQJournalTool consumerTool = new AMQJournalTool();
+ String args[] = new String[tokens.size()];
+ tokens.toArray(args);
+ String[] directories = CommandLineSupport.setOptions(consumerTool, args);
+ for (int i = 0; i < directories.length; i++) {
+ consumerTool.getDirs().add(new File(directories[i]));
+ }
+ consumerTool.execute();
+ }
+
+ public void setCommandContext(CommandContext context) {
+ this.context = context;
+ }
+
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java
new file mode 100644
index 0000000000..bffba306fb
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import java.util.ArrayList;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ * Helper utility that can be used to set the properties on any object using
+ * command line arguments.
+ *
+ * @author Hiram Chirino
+ */
+public final class CommandLineSupport {
+
+ private CommandLineSupport() {
+ }
+
+ /**
+ * Sets the properties of an object given the command line args.
+ *
+ * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent
+ *
+ * then it will try to call the following setters on the target object.
+ *
+ * target.setAckMode("AUTO");
+ * target.setURL(new URI("tcp://localhost:61616") );
+ * target.setPersistent(true);
+ *
+ * Notice the the proper conversion for the argument is determined by examining the
+ * setter arguement type.
+ *
+ * @param target the object that will have it's properties set
+ * @param args the commline options
+ * @return any arguments that are not valid options for the target
+ */
+ public static String[] setOptions(Object target, String[] args) {
+ ArrayList rc = new ArrayList();
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i] == null) {
+ continue;
+ }
+
+ if (args[i].startsWith("--")) {
+
+ // --options without a specified value are considered boolean
+ // flags that are enabled.
+ String value = "true";
+ String name = args[i].substring(2);
+
+ // if --option=value case
+ int p = name.indexOf("=");
+ if (p > 0) {
+ value = name.substring(p + 1);
+ name = name.substring(0, p);
+ }
+
+ // name not set, then it's an unrecognized option
+ if (name.length() == 0) {
+ rc.add(args[i]);
+ continue;
+ }
+
+ String propName = convertOptionToPropertyName(name);
+ if (!IntrospectionSupport.setProperty(target, propName, value)) {
+ rc.add(args[i]);
+ continue;
+ }
+ } else {
+ rc.add(args[i]);
+ }
+
+ }
+
+ String r[] = new String[rc.size()];
+ rc.toArray(r);
+ return r;
+ }
+
+ /**
+ * converts strings like: test-enabled to testEnabled
+ *
+ * @param name
+ * @return
+ */
+ private static String convertOptionToPropertyName(String name) {
+ String rc = "";
+
+ // Look for '-' and strip and then convert the subsequent char to
+ // uppercase
+ int p = name.indexOf("-");
+ while (p > 0) {
+ // strip
+ rc += name.substring(0, p);
+ name = name.substring(p + 1);
+
+ // can I convert the next char to upper?
+ if (name.length() > 0) {
+ rc += name.substring(0, 1).toUpperCase();
+ name = name.substring(1);
+ }
+
+ p = name.indexOf("-");
+ }
+ return rc + name;
+ }
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java
new file mode 100644
index 0000000000..7a44018c73
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+
+import org.apache.commons.collections.ExtendedProperties;
+import org.apache.velocity.exception.ResourceNotFoundException;
+import org.apache.velocity.runtime.RuntimeServices;
+import org.apache.velocity.runtime.resource.Resource;
+import org.apache.velocity.runtime.resource.loader.FileResourceLoader;
+import org.apache.velocity.runtime.resource.loader.ResourceLoader;
+
+public class CustomResourceLoader extends ResourceLoader {
+
+ private final static ThreadLocal> resourcesTL = new ThreadLocal>();
+ private final FileResourceLoader fileResourceLoader = new FileResourceLoader();
+
+ @Override
+ public void commonInit(RuntimeServices rs, ExtendedProperties configuration) {
+ super.commonInit(rs, configuration);
+ fileResourceLoader.commonInit(rs, configuration);
+ }
+
+ public void init( ExtendedProperties configuration)
+ {
+ fileResourceLoader.init(configuration);
+ }
+
+ /**
+ */
+ public synchronized InputStream getResourceStream( String name )
+ throws ResourceNotFoundException
+ {
+ InputStream result = null;
+
+ if (name == null || name.length() == 0)
+ {
+ throw new ResourceNotFoundException ("No template name provided");
+ }
+
+ String value = null;
+ HashMap resources = resourcesTL.get();
+ if( resources!=null ) {
+ value = resources.get(name);
+ }
+
+ if( value == null ) {
+ result = this.fileResourceLoader.getResourceStream(name);
+ } else {
+ try
+ {
+ result = new ByteArrayInputStream(value.getBytes());
+ }
+ catch( Exception e )
+ {
+ throw new ResourceNotFoundException( e.getMessage() );
+ }
+ }
+ return result;
+ }
+
+ public boolean isSourceModified(Resource resource)
+ {
+ return false;
+ }
+
+ public long getLastModified(Resource resource)
+ {
+ return 0;
+ }
+
+ static public HashMap getResources() {
+ return resourcesTL.get();
+ }
+
+ static public void setResources(HashMap arg0) {
+ resourcesTL.set(arg0);
+ }
+
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java
new file mode 100644
index 0000000000..17cd104b10
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.util.ByteSequence;
+import org.josql.Query;
+
+public class Entry {
+
+ Location location;
+ DataStructure record;
+ private ByteSequence data;
+ private String type;
+ private String formater;
+ private Query query;
+
+ public Location getLocation() {
+ return location;
+ }
+ public void setLocation(Location location) {
+ this.location = location;
+ }
+ public DataStructure getRecord() {
+ return record;
+ }
+ public void setRecord(DataStructure record) {
+ this.record = record;
+ }
+ public void setData(ByteSequence data) {
+ this.data = data;
+ }
+ public void setType(String type) {
+ this.type = type;
+ }
+ public ByteSequence getData() {
+ return data;
+ }
+ public String getType() {
+ return type;
+ }
+ public void setFormater(String formater) {
+ this.formater = formater;
+ }
+ public String getFormater() {
+ return formater;
+ }
+ public void setQuery(Query query) {
+ this.query = query;
+ }
+ public Query getQuery() {
+ return query;
+ }
+
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java
new file mode 100644
index 0000000000..e2d23dc32c
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.util.ByteSequence;
+
+public class MessageBodyFormatter {
+ final ActiveMQMessage message;
+
+ public MessageBodyFormatter(ActiveMQMessage message) {
+ this.message=message;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ switch (message.getDataStructureType()) {
+ case ActiveMQMessage.DATA_STRUCTURE_TYPE:
+ return "";
+ case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
+ ActiveMQBlobMessage blob = (ActiveMQBlobMessage) message;
+ return blob.getRemoteBlobUrl();
+ case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
+ ActiveMQMapMessage map = (ActiveMQMapMessage)message;
+ return map.getContentMap().toString();
+ case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
+ ActiveMQTextMessage text = (ActiveMQTextMessage)message;
+ return text.getText();
+ case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
+ case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
+ case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
+ ByteSequence data = message.getContent();
+ return "binary payload {length="+data.getLength()+", compressed="+message.isCompressed()+"}";
+ }
+ } catch (JMSException e) {
+ }
+ return "";
+ }
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java
new file mode 100644
index 0000000000..1e3e0a502f
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq.reader;
+
+import java.util.Iterator;
+
+import javax.jms.Message;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * An Iterator for the AMQReader
+ *
+ */
+class AMQIterator implements Iterator{
+ private AMQReader reader;
+ private BooleanExpression expression;
+ private MessageLocation currentLocation;
+ private MessageLocation nextLocation;
+ private boolean valid=true;
+
+
+ AMQIterator(AMQReader reader, BooleanExpression expression){
+ this.reader=reader;
+ this.expression=expression;
+ }
+
+ public boolean hasNext() {
+ try {
+ this.nextLocation = reader.getNextMessage(currentLocation);
+ Message next = nextLocation != null ? nextLocation.getMessage()
+ : null;
+ if (expression == null) {
+ return next != null;
+ } else {
+ while (next != null) {
+ MessageEvaluationContext context = new MessageEvaluationContext();
+ context.setMessageReference((MessageReference) next);
+ if (expression.matches(context)) {
+ return true;
+ }
+ this.nextLocation = reader.getNextMessage(currentLocation);
+ next = nextLocation != null ? nextLocation.getMessage()
+ : null;
+ }
+ valid=false;
+ return false;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to get next message from reader ", e);
+ }
+ }
+
+
+ public Message next() {
+ if (valid && (nextLocation != null || hasNext())) {
+ this.currentLocation=nextLocation;
+ return nextLocation.getMessage();
+ }
+ return null;
+ }
+
+
+ public void remove() {
+ throw new IllegalStateException("Not supported");
+
+ }
+
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java
new file mode 100644
index 0000000000..0b90924351
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq.reader;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.Message;
+
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Reads and iterates through data log files for the AMQMessage Store
+ *
+ */
+public class AMQReader implements Iterable {
+
+ private AsyncDataManager dataManager;
+ private WireFormat wireFormat = new OpenWireFormat();
+ private File file;
+ private BooleanExpression expression;
+
+ /**
+ * List all the data files in a directory
+ * @param directory
+ * @return
+ * @throws IOException
+ */
+ public static Set listDataFiles(File directory) throws IOException{
+ Setresult = new HashSet();
+ if (directory == null || !directory.exists() || !directory.isDirectory()) {
+ throw new IOException("Invalid Directory " + directory);
+ }
+ AsyncDataManager dataManager = new AsyncDataManager();
+ dataManager.setDirectory(directory);
+ dataManager.start();
+ Set set = dataManager.getFiles();
+ if (set != null) {
+ result.addAll(set);
+ }
+ dataManager.close();
+ return result;
+ }
+ /**
+ * Create the AMQReader to read a directory of amq data logs - or an
+ * individual data log file
+ *
+ * @param file the directory - or file
+ * @throws IOException
+ * @throws InvalidSelectorException
+ * @throws IOException
+ * @throws InvalidSelectorException
+ */
+ public AMQReader(File file) throws InvalidSelectorException, IOException {
+ this(file,null);
+ }
+
+ /**
+ * Create the AMQReader to read a directory of amq data logs - or an
+ * individual data log file
+ *
+ * @param file the directory - or file
+ * @param selector the JMS selector or null to select all
+ * @throws IOException
+ * @throws InvalidSelectorException
+ */
+ public AMQReader(File file, String selector) throws IOException, InvalidSelectorException {
+ String str = selector != null ? selector.trim() : null;
+ if (str != null && str.length() > 0) {
+ this.expression=SelectorParser.parse(str);
+ }
+ dataManager = new AsyncDataManager();
+ dataManager.setArchiveDataLogs(false);
+ if (file.isDirectory()) {
+ dataManager.setDirectory(file);
+ } else {
+ dataManager.setDirectory(file.getParentFile());
+ dataManager.setDirectoryArchive(file);
+ this.file = file;
+ }
+ dataManager.start();
+ }
+
+ public Iterator iterator() {
+ return new AMQIterator(this,this.expression);
+ }
+
+
+ protected MessageLocation getNextMessage(MessageLocation lastLocation)
+ throws IllegalStateException, IOException {
+ if (this.file != null) {
+ return getInternalNextMessage(this.file, lastLocation);
+ }
+ return getInternalNextMessage(lastLocation);
+ }
+
+ private MessageLocation getInternalNextMessage(MessageLocation lastLocation)
+ throws IllegalStateException, IOException {
+ return getInternalNextMessage(null, lastLocation);
+ }
+
+ private MessageLocation getInternalNextMessage(File file,
+ MessageLocation lastLocation) throws IllegalStateException,
+ IOException {
+ MessageLocation result = lastLocation;
+ if (result != null) {
+ result.setMessage(null);
+ }
+ Message message = null;
+ Location pos = lastLocation != null ? lastLocation.getLocation() : null;
+ while ((pos = getNextLocation(file, pos)) != null) {
+ message = getMessage(pos);
+ if (message != null) {
+ if (result == null) {
+ result = new MessageLocation();
+ }
+ result.setMessage(message);
+ break;
+ }
+ }
+ result.setLocation(pos);
+ if (pos == null && message == null) {
+ result = null;
+ } else {
+ result.setLocation(pos);
+ }
+ return result;
+ }
+
+ private Location getNextLocation(File file, Location last)
+ throws IllegalStateException, IOException {
+ if (file != null) {
+ return dataManager.getNextLocation(file, last, true);
+ }
+ return dataManager.getNextLocation(last);
+ }
+
+ private Message getMessage(Location location) throws IOException {
+ ByteSequence data = dataManager.read(location);
+ DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+ if (c instanceof Message) {
+ return (Message) c;
+ }
+ return null;
+
+ }
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java
new file mode 100644
index 0000000000..e4c68c6fee
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq.reader;
+
+import javax.jms.Message;
+
+import org.apache.activemq.kaha.impl.async.Location;
+
+/**
+ * A holder for a message
+ *
+ */
+class MessageLocation {
+ private Message message;
+ private Location location;
+
+
+ /**
+ * @return the location
+ */
+ public Location getLocation() {
+ return location;
+ }
+
+ /**
+ * @param location
+ */
+ public void setLocation(Location location) {
+ this.location = location;
+ }
+ /**
+ * @return the message
+ */
+ public Message getMessage() {
+ return message;
+ }
+
+ /**
+ * @param message
+ */
+ public void setMessage(Message message) {
+ this.message = message;
+ }
+
+
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java
new file mode 100644
index 0000000000..970739d30c
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery.http;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DiscoveryRegistryServlet extends HttpServlet {
+
+ private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class);
+ long maxKeepAge = 1000*60*60; // 1 hour.
+ ConcurrentHashMap> serviceGroups = new ConcurrentHashMap>();
+
+ @Override
+ protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ String group = req.getPathInfo();
+ String service = req.getHeader("service");
+ LOG.debug("Registering: group="+group+", service="+service);
+
+ ConcurrentHashMap services = getServiceGroup(group);
+ services.put(service, System.currentTimeMillis());
+ }
+
+ private ConcurrentHashMap getServiceGroup(String group) {
+ ConcurrentHashMap rc = serviceGroups.get(group);
+ if( rc == null ) {
+ rc = new ConcurrentHashMap();
+ serviceGroups.put(group, rc);
+ }
+ return rc;
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ try {
+ long freshness = 1000*30;
+ String p = req.getParameter("freshness");
+ if( p!=null ) {
+ freshness = Long.parseLong(p);
+ }
+
+ String group = req.getPathInfo();
+ LOG.debug("group="+group);
+ ConcurrentHashMap services = getServiceGroup(group);
+ PrintWriter writer = resp.getWriter();
+
+ long now = System.currentTimeMillis();
+ long dropTime = now-maxKeepAge;
+ long minimumTime = now-freshness;
+
+ ArrayList dropList = new ArrayList();
+ for (Map.Entry entry : services.entrySet()) {
+ if( entry.getValue() > minimumTime ) {
+ writer.println(entry.getKey());
+ } else if( entry.getValue() < dropTime ) {
+ dropList.add(entry.getKey());
+ }
+ }
+
+ // We might as well get rid of the really old entries.
+ for (String service : dropList) {
+ services.remove(service);
+ }
+
+
+ } catch (Exception e) {
+ resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error occured: "+e);
+ }
+ }
+
+ @Override
+ protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ String group = req.getPathInfo();
+ String service = req.getHeader("service");
+ LOG.debug("Unregistering: group="+group+", service="+service);
+
+ ConcurrentHashMap services = getServiceGroup(group);
+ services.remove(service);
+ }
+
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
new file mode 100644
index 0000000000..c26ae7360e
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery.http;
+
+import java.net.URI;
+
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+public class EmbeddedJettyServer implements org.apache.activemq.Service {
+
+ private HTTPDiscoveryAgent agent;
+ private Server server;
+ private SelectChannelConnector connector;
+ private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet();
+
+ public void start() throws Exception {
+ URI uri = new URI(agent.getRegistryURL());
+
+ server = new Server();
+ Context context = new Context(Context.NO_SECURITY | Context.NO_SESSIONS);
+
+ context.setContextPath("/");
+ ServletHolder holder = new ServletHolder();
+ holder.setServlet(camelServlet);
+ context.addServlet(holder, "/*");
+ server.setHandler(context);
+ server.start();
+
+ int port = 80;
+ if( uri.getPort() >=0 ) {
+ port = uri.getPort();
+ }
+
+ connector = new SelectChannelConnector();
+ connector.setPort(port);
+ server.addConnector(connector);
+ connector.start();
+ }
+
+ public void stop() throws Exception {
+ if( connector!=null ) {
+ connector.stop();
+ connector = null;
+ }
+ if( server!=null ) {
+ server.stop();
+ server = null;
+ }
+ }
+
+ public HTTPDiscoveryAgent getAgent() {
+ return agent;
+ }
+
+ public void setAgent(HTTPDiscoveryAgent agent) {
+ this.agent = agent;
+ }
+
+
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
new file mode 100644
index 0000000000..46e2328bd1
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery.http;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.discovery.DiscoveryListener;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class HTTPDiscoveryAgent implements DiscoveryAgent {
+
+ private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class);
+
+ private String registryURL = "http://localhost:8080/discovery-registry/default";
+ private HttpClient httpClient = new HttpClient();
+ private AtomicBoolean running=new AtomicBoolean();
+ private final AtomicReference discoveryListener = new AtomicReference();
+ private final HashSet registeredServices = new HashSet();
+ private final HashMap discoveredServices = new HashMap();
+ private Thread thread;
+ private long updateInterval = 1000*10;
+ private String brokerName;
+ private boolean startEmbeddRegistry=false;
+ private Service jetty;
+ private AtomicInteger startCounter=new AtomicInteger(0);
+
+
+ private long initialReconnectDelay = 1000;
+ private long maxReconnectDelay = 1000 * 30;
+ private long backOffMultiplier = 2;
+ private boolean useExponentialBackOff=true;
+ private int maxReconnectAttempts;
+ private final Object sleepMutex = new Object();
+ private long minConnectTime = 5000;
+
+ class SimpleDiscoveryEvent extends DiscoveryEvent {
+
+ private int connectFailures;
+ private long reconnectDelay = initialReconnectDelay;
+ private long connectTime = System.currentTimeMillis();
+ private AtomicBoolean failed = new AtomicBoolean(false);
+ private AtomicBoolean removed = new AtomicBoolean(false);
+
+ public SimpleDiscoveryEvent(String service) {
+ super(service);
+ }
+
+ }
+
+
+ public String getGroup() {
+ return null;
+ }
+
+ public void registerService(String service) throws IOException {
+ synchronized(registeredServices) {
+ registeredServices.add(service);
+ }
+ doRegister(service);
+ }
+
+ synchronized private void doRegister(String service) {
+ String url = registryURL;
+ try {
+ PutMethod method = new PutMethod(url);
+// method.setParams(createParams());
+ method.setRequestHeader("service", service);
+ int responseCode = httpClient.executeMethod(method);
+ LOG.debug("PUT to "+url+" got a "+responseCode);
+ } catch (Exception e) {
+ LOG.debug("PUT to "+url+" failed with: "+e);
+ }
+ }
+
+ synchronized private void doUnRegister(String service) {
+ String url = registryURL;
+ try {
+ DeleteMethod method = new DeleteMethod(url);
+// method.setParams(createParams());
+ method.setRequestHeader("service", service);
+ int responseCode = httpClient.executeMethod(method);
+ LOG.debug("DELETE to "+url+" got a "+responseCode);
+ } catch (Exception e) {
+ LOG.debug("DELETE to "+url+" failed with: "+e);
+ }
+ }
+
+// private HttpMethodParams createParams() {
+// HttpMethodParams params = new HttpMethodParams();
+// params.setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0,false));
+// return params;
+// }
+
+ synchronized private Set doLookup(long freshness) {
+ String url = registryURL+"?freshness="+freshness;
+ try {
+ GetMethod method = new GetMethod(url);
+// method.setParams(createParams());
+ int responseCode = httpClient.executeMethod(method);
+ LOG.debug("GET to "+url+" got a "+responseCode);
+ if( responseCode == 200 ) {
+ Set rc = new HashSet();
+ Scanner scanner = new Scanner(method.getResponseBodyAsStream());
+ while( scanner.hasNextLine() ) {
+ String service = scanner.nextLine();
+ if( service.trim().length() != 0 ) {
+ rc.add(service);
+ }
+ }
+ return rc;
+ } else {
+ LOG.debug("GET to "+url+" failed with response code: "+responseCode);
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.debug("GET to "+url+" failed with: "+e);
+ return null;
+ }
+ }
+
+ public void serviceFailed(DiscoveryEvent devent) throws IOException {
+
+ final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
+ if (event.failed.compareAndSet(false, true)) {
+ discoveryListener.get().onServiceRemove(event);
+ if(!event.removed.get()) {
+ // Setup a thread to re-raise the event...
+ Thread thread = new Thread() {
+ public void run() {
+
+ // We detect a failed connection attempt because the service
+ // fails right away.
+ if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
+ LOG.debug("Failure occured soon after the discovery event was generated. It will be clasified as a connection failure: "+event);
+
+ event.connectFailures++;
+
+ if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
+ LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
+ return;
+ }
+
+ synchronized (sleepMutex) {
+ try {
+ if (!running.get() || event.removed.get()) {
+ return;
+ }
+ LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect.");
+ sleepMutex.wait(event.reconnectDelay);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+
+ if (!useExponentialBackOff) {
+ event.reconnectDelay = initialReconnectDelay;
+ } else {
+ // Exponential increment of reconnect delay.
+ event.reconnectDelay *= backOffMultiplier;
+ if (event.reconnectDelay > maxReconnectDelay) {
+ event.reconnectDelay = maxReconnectDelay;
+ }
+ }
+
+ } else {
+ event.connectFailures = 0;
+ event.reconnectDelay = initialReconnectDelay;
+ }
+
+ if (!running.get() || event.removed.get()) {
+ return;
+ }
+
+ event.connectTime = System.currentTimeMillis();
+ event.failed.set(false);
+ discoveryListener.get().onServiceAdd(event);
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();
+ }
+ }
+ }
+
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public void setDiscoveryListener(DiscoveryListener discoveryListener) {
+ this.discoveryListener.set(discoveryListener);
+ }
+
+ public void setGroup(String group) {
+ }
+
+ public void start() throws Exception {
+ if( startCounter.addAndGet(1)==1 ) {
+ if( startEmbeddRegistry ) {
+ jetty = createEmbeddedJettyServer();
+ Map props = new HashMap();
+ props.put("agent", this);
+ IntrospectionSupport.setProperties(jetty, props);
+ jetty.start();
+ }
+
+ running.set(true);
+ thread = new Thread("HTTPDiscovery Agent") {
+ @Override
+ public void run() {
+ while(running.get()) {
+ try {
+ update();
+ Thread.sleep(updateInterval);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();
+ }
+ }
+
+ /**
+ * Create the EmbeddedJettyServer instance via reflection so that we can avoid a hard runtime dependency on
+ * jetty.
+ *
+ * @return
+ * @throws Exception
+ */
+ private Service createEmbeddedJettyServer() throws Exception {
+ Class clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer");
+ return (Service)clazz.newInstance();
+ }
+
+ private void update() {
+ // Register all our services...
+ synchronized(registeredServices) {
+ for (String service : registeredServices) {
+ doRegister(service);
+ }
+ }
+
+ // Find new registered services...
+ DiscoveryListener discoveryListener = this.discoveryListener.get();
+ if(discoveryListener!=null) {
+ Set activeServices = doLookup(updateInterval*3);
+ // If there is error talking the the central server, then activeServices == null
+ if( activeServices !=null ) {
+ synchronized(discoveredServices) {
+
+ HashSet removedServices = new HashSet(discoveredServices.keySet());
+ removedServices.removeAll(activeServices);
+
+ HashSet addedServices = new HashSet(activeServices);
+ addedServices.removeAll(discoveredServices.keySet());
+ addedServices.removeAll(removedServices);
+
+ for (String service : addedServices) {
+ SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service);
+ discoveredServices.put(service, e);
+ discoveryListener.onServiceAdd(e);
+ }
+
+ for (String service : removedServices) {
+ SimpleDiscoveryEvent e = discoveredServices.remove(service);
+ if( e !=null ) {
+ e.removed.set(true);
+ }
+ discoveryListener.onServiceRemove(e);
+ }
+ }
+ }
+ }
+ }
+
+ public void stop() throws Exception {
+ if( startCounter.decrementAndGet()==0 ) {
+ running.set(false);
+ if( thread!=null ) {
+ thread.join(updateInterval*3);
+ thread=null;
+ }
+ if( jetty!=null ) {
+ jetty.stop();
+ jetty = null;
+ }
+ }
+ }
+
+ public String getRegistryURL() {
+ return registryURL;
+ }
+
+ public void setRegistryURL(String discoveryRegistryURL) {
+ this.registryURL = discoveryRegistryURL;
+ }
+
+ public long getUpdateInterval() {
+ return updateInterval;
+ }
+
+ public void setUpdateInterval(long updateInterval) {
+ this.updateInterval = updateInterval;
+ }
+
+ public boolean isStartEmbeddRegistry() {
+ return startEmbeddRegistry;
+ }
+
+ public void setStartEmbeddRegistry(boolean startEmbeddRegistry) {
+ this.startEmbeddRegistry = startEmbeddRegistry;
+ }
+
+}
diff --git a/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java
new file mode 100755
index 0000000000..45c50bc9e3
--- /dev/null
+++ b/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery.http;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+
+public class HTTPDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+ protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException {
+ try {
+
+ Map options = URISupport.parseParamters(uri);
+ uri = URISupport.removeQuery(uri);
+
+ HTTPDiscoveryAgent rc = new HTTPDiscoveryAgent();
+ rc.setRegistryURL(uri.toString());
+
+ IntrospectionSupport.setProperties(rc, options);
+
+ return rc;
+
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Could not create discovery agent: " + uri, e);
+ }
+ }
+}
diff --git a/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http b/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http
new file mode 100644
index 0000000000..f7c63deea3
--- /dev/null
+++ b/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http
@@ -0,0 +1,4 @@
+## ---------------------------------------------------------------------------
+##
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.discovery.http.HTTPDiscoveryAgentFactory
diff --git a/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt b/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt
new file mode 100644
index 0000000000..780564290e
--- /dev/null
+++ b/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt
@@ -0,0 +1,52 @@
+Usage:
+ java org.apache.activemq.store.amq.AMQJournalTool [options]* (directory) *
+
+Displays the records stored in the Journal log files used by ActiveMQ. This
+tool supports loading the journal data files from multiple directories. Normally
+it is run against the journal archive directory and the active journal directory.
+
+This tool supports controlling the output format using Velocity [1] templates.
+It also supports filtering out records using a SQL like WHERE syntax implemented
+using JoSQL.
+
+Options to control output format:
+
+Any valid Velocity Template Language (VTL) expression can be used to control the
+display of the record.
+
+ --message-format=VTL The format used to display message records. Message
+ records get created every time a producer sends a persistent message to the broker.
+ The message gets recorded in the journal even if it's transaction is rolled back.
+ Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}
+
+ --topic-ack-format=VTL The format used to display topic ack records. A topic
+ ack records that a durable subscription for a topic has acknowleged a set of messages.
+ Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}
+
+ --queue-ack-format=VTL The format used to display queue ack records. A queue
+ ack records that a consumer for a quue has acknowleged a message.
+ Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}
+
+ --transaction-format=VTL The format used to display transaction records. Transaction records
+ are used to record transaction related actions like commit and rollback.
+ Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.transactionId}
+
+ --trace-format=VTL The format used to display trace records.
+ Trace records are informational messages stored in the journal that assist in Auditing.
+ For example a trace message is recorded whenever the broker is restarted or when the
+ long term store is checkpointed.
+ Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.message}
+
+Options to control the selection of records displayed:
+ --where=VALUE The where clause used to control the records selected
+ for display. It can select on all the fields available in the velocity context.
+ example: --where="type='ActiveMQTextMessage' and location.dataFileId > 2"
+
+Other Options:
+ --help Show this help screen.
+
+Example:
+
+ java org.apache.activemq.store.amq.AMQJournalTool /path/to/archive /path/to/journal
+
+
\ No newline at end of file
diff --git a/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java b/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java
new file mode 100644
index 0000000000..3bb6491932
--- /dev/null
+++ b/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq.reader;
+
+import java.io.File;
+import java.util.Set;
+
+import javax.jms.Message;
+import junit.framework.TestCase;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+
+public class AMQReaderTest extends TestCase {
+
+ public void testIterateArchive() throws Exception{
+ String resourceName = getClass().getPackage().getName() + File.separator + "data";
+ resourceName = resourceName.replace('.', File.separatorChar);
+ Resource resource = new ClassPathResource(resourceName);
+ AMQReader reader = new AMQReader(resource.getFile());
+ for (Message m:reader) {
+ assertNotNull(m);
+ }
+ }
+
+ public void xtestIterateFile() throws Exception{
+ String resourceName = getClass().getPackage().getName() + File.separator + "data";
+ resourceName = resourceName.replace('.', File.separatorChar);
+ Resource resource = new ClassPathResource(resourceName);
+ Set files = AMQReader.listDataFiles(resource.getFile());
+ assertNotNull(files);
+ assertTrue(files.size() >0);
+ for (File file: files) {
+ System.err.println("READING " + file);
+ AMQReader reader = new AMQReader(file);
+ for (Message m:reader) {
+ assertNotNull(m);
+ }
+ }
+ }
+
+}
diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1
new file mode 100644
index 0000000000..bbae0c2e84
Binary files /dev/null and b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1 differ
diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-2 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-2
new file mode 100644
index 0000000000..4c489052c1
Binary files /dev/null and b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-2 differ
diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3
new file mode 100644
index 0000000000..cff5040f1f
Binary files /dev/null and b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3 differ
diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-4 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-4
new file mode 100644
index 0000000000..b7fb19a3e5
Binary files /dev/null and b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-4 differ
diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-5 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-5
new file mode 100644
index 0000000000..6840d553b4
Binary files /dev/null and b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-5 differ
diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6
new file mode 100644
index 0000000000..a3b776e833
Binary files /dev/null and b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6 differ
diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7
new file mode 100644
index 0000000000..9488e64d83
Binary files /dev/null and b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7 differ
diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8
new file mode 100644
index 0000000000..ac4cd3bced
Binary files /dev/null and b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8 differ
diff --git a/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9 b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9
new file mode 100644
index 0000000000..88cdf8fdd7
Binary files /dev/null and b/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9 differ
diff --git a/pom.xml b/pom.xml
index 6ecc7a100b..e1625f124c 100755
--- a/pom.xml
+++ b/pom.xml
@@ -892,12 +892,32 @@
geronimo-transaction
2.1
-
-
- commons-net
- commons-net
- ${commons-net-version}
-
+
+
+
+ commons-net
+ commons-net
+ ${commons-net-version}
+
+
+
+ velocity
+ velocity
+ 1.4
+ true
+
+
+ net.sf.josql
+ josql
+ 1.5
+ true
+
+
+ net.sf.josql
+ gentlyweb-utils
+ 1.5
+ true
+