AMQ-2408 and AMQ-2407

- adding new tool to manually inspect/audit the amqPersistenceAdapter's journal files.
 - adding new HTTP based discovery agent



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@818209 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-09-23 19:06:58 +00:00
parent 6e7c4a4da9
commit a0af351520
26 changed files with 1779 additions and 2 deletions

View File

@ -44,7 +44,11 @@
<groupId>${pom.groupId}</groupId>
<artifactId>activeio-core</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-console</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
@ -153,7 +157,22 @@
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<scope>test</scope>
</dependency>
</dependency>
<dependency>
<groupId>velocity</groupId>
<artifactId>velocity</artifactId>
</dependency>
<dependency>
<groupId>net.sf.josql</groupId>
<artifactId>josql</artifactId>
</dependency>
<dependency>
<groupId>net.sf.josql</groupId>
<artifactId>gentlyweb-utils</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,353 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.File;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.velocity.Template;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.Velocity;
import org.apache.velocity.app.VelocityEngine;
import org.josql.Query;
/**
* Allows you to view the contents of a Journal.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AMQJournalTool {
private final ArrayList<File> dirs = new ArrayList<File>();
private final WireFormat wireFormat = new OpenWireFormat();
private final HashMap<String, String> resources = new HashMap<String, String>();
private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
private String where;
private VelocityContext context;
private VelocityEngine velocity;
private boolean help;
public static void main(String[] args) throws Exception {
AMQJournalTool consumerTool = new AMQJournalTool();
String[] directories = CommandLineSupport
.setOptions(consumerTool, args);
if (directories.length < 1) {
System.out
.println("Please specify the directories with journal data to scan");
return;
}
for (int i = 0; i < directories.length; i++) {
consumerTool.getDirs().add(new File(directories[i]));
}
consumerTool.execute();
}
public void execute() throws Exception {
if( help ) {
showHelp();
return;
}
if (getDirs().size() < 1) {
System.out.println("");
System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
System.out.println("");
showHelp();
return;
}
for (File dir : getDirs()) {
if( !dir.exists() ) {
System.out.println("");
System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
System.out.println("");
showHelp();
return;
}
if( !dir.isDirectory() ) {
System.out.println("");
System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
System.out.println("");
showHelp();
return;
}
}
context = new VelocityContext();
List keys = Arrays.asList(context.getKeys());
for (Iterator iterator = System.getProperties().entrySet()
.iterator(); iterator.hasNext();) {
Map.Entry kv = (Map.Entry) iterator.next();
String name = (String) kv.getKey();
String value = (String) kv.getValue();
if (!keys.contains(name)) {
context.put(name, value);
}
}
velocity = new VelocityEngine();
velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
velocity.init();
resources.put("message", messageFormat);
resources.put("topicAck", topicAckFormat);
resources.put("queueAck", queueAckFormat);
resources.put("transaction", transactionFormat);
resources.put("trace", traceFormat);
resources.put("unknown", unknownFormat);
Query query = null;
if (where != null) {
query = new Query();
query.parse("select * from "+Entry.class.getName()+" where "+where);
}
ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
manager.start();
try {
Location curr = manager.getFirstLocation();
while (curr != null) {
ByteSequence data = manager.read(curr);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
Entry entry = new Entry();
entry.setLocation(curr);
entry.setRecord(c);
entry.setData(data);
entry.setQuery(query);
process(entry);
curr = manager.getNextLocation(curr);
}
} finally {
manager.close();
}
}
private void showHelp() {
InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
Scanner scanner = new Scanner(is);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
System.out.println(line);
}
scanner.close(); }
private void process(Entry entry) throws Exception {
Location location = entry.getLocation();
DataStructure record = entry.getRecord();
switch (record.getDataStructureType()) {
case ActiveMQMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQBytesMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQBlobMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQMapMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQObjectMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQStreamMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQTextMessage");
entry.setFormater("message");
display(entry);
break;
case JournalQueueAck.DATA_STRUCTURE_TYPE:
entry.setType("Queue Ack");
entry.setFormater("queueAck");
display(entry);
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE:
entry.setType("Topic Ack");
entry.setFormater("topicAck");
display(entry);
break;
case JournalTransaction.DATA_STRUCTURE_TYPE:
entry.setType(getType((JournalTransaction) record));
entry.setFormater("transaction");
display(entry);
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
entry.setType("Trace");
entry.setFormater("trace");
display(entry);
break;
default:
entry.setType("Unknown");
entry.setFormater("unknown");
display(entry);
break;
}
}
private String getType(JournalTransaction record) {
switch (record.getType()) {
case JournalTransaction.XA_PREPARE:
return "XA Prepare";
case JournalTransaction.XA_COMMIT:
return "XA Commit";
case JournalTransaction.XA_ROLLBACK:
return "XA Rollback";
case JournalTransaction.LOCAL_COMMIT:
return "Commit";
case JournalTransaction.LOCAL_ROLLBACK:
return "Rollback";
}
return "Unknown Transaction";
}
private void display(Entry entry) throws Exception {
if (entry.getQuery() != null) {
List list = Collections.singletonList(entry);
List results = entry.getQuery().execute(list).getResults();
if (results.isEmpty()) {
return;
}
}
CustomResourceLoader.setResources(resources);
try {
context.put("location", entry.getLocation());
context.put("record", entry.getRecord());
context.put("type", entry.getType());
if (entry.getRecord() instanceof ActiveMQMessage) {
context.put("body", new MessageBodyFormatter(
(ActiveMQMessage) entry.getRecord()));
}
Template template = velocity.getTemplate(entry.getFormater());
PrintWriter writer = new PrintWriter(System.out);
template.merge(context, writer);
writer.println();
writer.flush();
} finally {
CustomResourceLoader.setResources(null);
}
}
public void setMessageFormat(String messageFormat) {
this.messageFormat = messageFormat;
}
public void setTopicAckFormat(String ackFormat) {
this.topicAckFormat = ackFormat;
}
public void setTransactionFormat(String transactionFormat) {
this.transactionFormat = transactionFormat;
}
public void setTraceFormat(String traceFormat) {
this.traceFormat = traceFormat;
}
public void setUnknownFormat(String unknownFormat) {
this.unknownFormat = unknownFormat;
}
public void setQueueAckFormat(String queueAckFormat) {
this.queueAckFormat = queueAckFormat;
}
public String getQuery() {
return where;
}
public void setWhere(String query) {
this.where = query;
}
public boolean isHelp() {
return help;
}
public void setHelp(boolean help) {
this.help = help;
}
/**
* @return the dirs
*/
public ArrayList<File> getDirs() {
return dirs;
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.File;
import java.util.List;
import org.apache.activemq.console.CommandContext;
import org.apache.activemq.console.command.Command;
public class AMQJournalToolCommand implements Command {
private CommandContext context;
public void execute(List<String> tokens) throws Exception {
AMQJournalTool consumerTool = new AMQJournalTool();
String args[] = new String[tokens.size()];
tokens.toArray(args);
String[] directories = CommandLineSupport.setOptions(consumerTool, args);
for (int i = 0; i < directories.length; i++) {
consumerTool.getDirs().add(new File(directories[i]));
}
consumerTool.execute();
}
public void setCommandContext(CommandContext context) {
this.context = context;
}
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.util.ArrayList;
import org.apache.activemq.util.IntrospectionSupport;
/**
* Helper utility that can be used to set the properties on any object using
* command line arguments.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public final class CommandLineSupport {
private CommandLineSupport() {
}
/**
* Sets the properties of an object given the command line args.
*
* if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent
*
* then it will try to call the following setters on the target object.
*
* target.setAckMode("AUTO");
* target.setURL(new URI("tcp://localhost:61616") );
* target.setPersistent(true);
*
* Notice the the proper conversion for the argument is determined by examining the
* setter arguement type.
*
* @param target the object that will have it's properties set
* @param args the commline options
* @return any arguments that are not valid options for the target
*/
public static String[] setOptions(Object target, String[] args) {
ArrayList<String> rc = new ArrayList<String>();
for (int i = 0; i < args.length; i++) {
if (args[i] == null) {
continue;
}
if (args[i].startsWith("--")) {
// --options without a specified value are considered boolean
// flags that are enabled.
String value = "true";
String name = args[i].substring(2);
// if --option=value case
int p = name.indexOf("=");
if (p > 0) {
value = name.substring(p + 1);
name = name.substring(0, p);
}
// name not set, then it's an unrecognized option
if (name.length() == 0) {
rc.add(args[i]);
continue;
}
String propName = convertOptionToPropertyName(name);
if (!IntrospectionSupport.setProperty(target, propName, value)) {
rc.add(args[i]);
continue;
}
} else {
rc.add(args[i]);
}
}
String r[] = new String[rc.size()];
rc.toArray(r);
return r;
}
/**
* converts strings like: test-enabled to testEnabled
*
* @param name
* @return
*/
private static String convertOptionToPropertyName(String name) {
String rc = "";
// Look for '-' and strip and then convert the subsequent char to
// uppercase
int p = name.indexOf("-");
while (p > 0) {
// strip
rc += name.substring(0, p);
name = name.substring(p + 1);
// can I convert the next char to upper?
if (name.length() > 0) {
rc += name.substring(0, 1).toUpperCase();
name = name.substring(1);
}
p = name.indexOf("-");
}
return rc + name;
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
import org.apache.commons.collections.ExtendedProperties;
import org.apache.velocity.exception.ResourceNotFoundException;
import org.apache.velocity.runtime.RuntimeServices;
import org.apache.velocity.runtime.resource.Resource;
import org.apache.velocity.runtime.resource.loader.FileResourceLoader;
import org.apache.velocity.runtime.resource.loader.ResourceLoader;
public class CustomResourceLoader extends ResourceLoader {
private final static ThreadLocal<HashMap<String, String>> resourcesTL = new ThreadLocal<HashMap<String, String>>();
private final FileResourceLoader fileResourceLoader = new FileResourceLoader();
@Override
public void commonInit(RuntimeServices rs, ExtendedProperties configuration) {
super.commonInit(rs, configuration);
fileResourceLoader.commonInit(rs, configuration);
}
public void init( ExtendedProperties configuration)
{
fileResourceLoader.init(configuration);
}
/**
*/
public synchronized InputStream getResourceStream( String name )
throws ResourceNotFoundException
{
InputStream result = null;
if (name == null || name.length() == 0)
{
throw new ResourceNotFoundException ("No template name provided");
}
String value = null;
HashMap<String, String> resources = resourcesTL.get();
if( resources!=null ) {
value = resources.get(name);
}
if( value == null ) {
result = this.fileResourceLoader.getResourceStream(name);
} else {
try
{
result = new ByteArrayInputStream(value.getBytes());
}
catch( Exception e )
{
throw new ResourceNotFoundException( e.getMessage() );
}
}
return result;
}
public boolean isSourceModified(Resource resource)
{
return false;
}
public long getLastModified(Resource resource)
{
return 0;
}
static public HashMap<String, String> getResources() {
return resourcesTL.get();
}
static public void setResources(HashMap<String, String> arg0) {
resourcesTL.set(arg0);
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.util.ByteSequence;
import org.josql.Query;
public class Entry {
Location location;
DataStructure record;
private ByteSequence data;
private String type;
private String formater;
private Query query;
public Location getLocation() {
return location;
}
public void setLocation(Location location) {
this.location = location;
}
public DataStructure getRecord() {
return record;
}
public void setRecord(DataStructure record) {
this.record = record;
}
public void setData(ByteSequence data) {
this.data = data;
}
public void setType(String type) {
this.type = type;
}
public ByteSequence getData() {
return data;
}
public String getType() {
return type;
}
public void setFormater(String formater) {
this.formater = formater;
}
public String getFormater() {
return formater;
}
public void setQuery(Query query) {
this.query = query;
}
public Query getQuery() {
return query;
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.ByteSequence;
public class MessageBodyFormatter {
final ActiveMQMessage message;
public MessageBodyFormatter(ActiveMQMessage message) {
this.message=message;
}
@Override
public String toString() {
try {
switch (message.getDataStructureType()) {
case ActiveMQMessage.DATA_STRUCTURE_TYPE:
return "";
case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
ActiveMQBlobMessage blob = (ActiveMQBlobMessage) message;
return blob.getRemoteBlobUrl();
case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
ActiveMQMapMessage map = (ActiveMQMapMessage)message;
return map.getContentMap().toString();
case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
ActiveMQTextMessage text = (ActiveMQTextMessage)message;
return text.getText();
case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
ByteSequence data = message.getContent();
return "binary payload {length="+data.getLength()+", compressed="+message.isCompressed()+"}";
}
} catch (JMSException e) {
}
return "";
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq.reader;
import java.util.Iterator;
import javax.jms.Message;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
/**
* An Iterator for the AMQReader
*
*/
class AMQIterator implements Iterator<Message>{
private AMQReader reader;
private BooleanExpression expression;
private MessageLocation currentLocation;
private MessageLocation nextLocation;
private boolean valid=true;
AMQIterator(AMQReader reader, BooleanExpression expression){
this.reader=reader;
this.expression=expression;
}
public boolean hasNext() {
try {
this.nextLocation = reader.getNextMessage(currentLocation);
Message next = nextLocation != null ? nextLocation.getMessage()
: null;
if (expression == null) {
return next != null;
} else {
while (next != null) {
MessageEvaluationContext context = new MessageEvaluationContext();
context.setMessageReference((MessageReference) next);
if (expression.matches(context)) {
return true;
}
this.nextLocation = reader.getNextMessage(currentLocation);
next = nextLocation != null ? nextLocation.getMessage()
: null;
}
valid=false;
return false;
}
} catch (Exception e) {
throw new RuntimeException(
"Failed to get next message from reader ", e);
}
}
public Message next() {
if (valid && (nextLocation != null || hasNext())) {
this.currentLocation=nextLocation;
return nextLocation.getMessage();
}
return null;
}
public void remove() {
throw new IllegalStateException("Not supported");
}
}

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq.reader;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.jms.InvalidSelectorException;
import javax.jms.Message;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
* Reads and iterates through data log files for the AMQMessage Store
*
*/
public class AMQReader implements Iterable<Message> {
private AsyncDataManager dataManager;
private WireFormat wireFormat = new OpenWireFormat();
private File file;
private BooleanExpression expression;
/**
* List all the data files in a directory
* @param directory
* @return
* @throws IOException
*/
public static Set<File> listDataFiles(File directory) throws IOException{
Set<File>result = new HashSet<File>();
if (directory == null || !directory.exists() || !directory.isDirectory()) {
throw new IOException("Invalid Directory " + directory);
}
AsyncDataManager dataManager = new AsyncDataManager();
dataManager.setDirectory(directory);
dataManager.start();
Set<File> set = dataManager.getFiles();
if (set != null) {
result.addAll(set);
}
dataManager.close();
return result;
}
/**
* Create the AMQReader to read a directory of amq data logs - or an
* individual data log file
*
* @param file the directory - or file
* @throws IOException
* @throws InvalidSelectorException
* @throws IOException
* @throws InvalidSelectorException
*/
public AMQReader(File file) throws InvalidSelectorException, IOException {
this(file,null);
}
/**
* Create the AMQReader to read a directory of amq data logs - or an
* individual data log file
*
* @param file the directory - or file
* @param selector the JMS selector or null to select all
* @throws IOException
* @throws InvalidSelectorException
*/
public AMQReader(File file, String selector) throws IOException, InvalidSelectorException {
String str = selector != null ? selector.trim() : null;
if (str != null && str.length() > 0) {
this.expression=SelectorParser.parse(str);
}
dataManager = new AsyncDataManager();
dataManager.setArchiveDataLogs(false);
if (file.isDirectory()) {
dataManager.setDirectory(file);
} else {
dataManager.setDirectory(file.getParentFile());
dataManager.setDirectoryArchive(file);
this.file = file;
}
dataManager.start();
}
public Iterator<Message> iterator() {
return new AMQIterator(this,this.expression);
}
protected MessageLocation getNextMessage(MessageLocation lastLocation)
throws IllegalStateException, IOException {
if (this.file != null) {
return getInternalNextMessage(this.file, lastLocation);
}
return getInternalNextMessage(lastLocation);
}
private MessageLocation getInternalNextMessage(MessageLocation lastLocation)
throws IllegalStateException, IOException {
return getInternalNextMessage(null, lastLocation);
}
private MessageLocation getInternalNextMessage(File file,
MessageLocation lastLocation) throws IllegalStateException,
IOException {
MessageLocation result = lastLocation;
if (result != null) {
result.setMessage(null);
}
Message message = null;
Location pos = lastLocation != null ? lastLocation.getLocation() : null;
while ((pos = getNextLocation(file, pos)) != null) {
message = getMessage(pos);
if (message != null) {
if (result == null) {
result = new MessageLocation();
}
result.setMessage(message);
break;
}
}
result.setLocation(pos);
if (pos == null && message == null) {
result = null;
} else {
result.setLocation(pos);
}
return result;
}
private Location getNextLocation(File file, Location last)
throws IllegalStateException, IOException {
if (file != null) {
return dataManager.getNextLocation(file, last, true);
}
return dataManager.getNextLocation(last);
}
private Message getMessage(Location location) throws IOException {
ByteSequence data = dataManager.read(location);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
if (c instanceof Message) {
return (Message) c;
}
return null;
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq.reader;
import javax.jms.Message;
import org.apache.activemq.kaha.impl.async.Location;
/**
* A holder for a message
*
*/
class MessageLocation {
private Message message;
private Location location;
/**
* @return the location
*/
public Location getLocation() {
return location;
}
/**
* @param location
*/
public void setLocation(Location location) {
this.location = location;
}
/**
* @return the message
*/
public Message getMessage() {
return message;
}
/**
* @param message
*/
public void setMessage(Message message) {
this.message = message;
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.discovery.http;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class DiscoveryRegistryServlet extends HttpServlet {
private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class);
long maxKeepAge = 1000*60*60; // 1 hour.
ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> serviceGroups = new ConcurrentHashMap<String, ConcurrentHashMap<String, Long>>();
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String group = req.getPathInfo();
String service = req.getHeader("service");
LOG.debug("Registering: group="+group+", service="+service);
ConcurrentHashMap<String, Long> services = getServiceGroup(group);
services.put(service, System.currentTimeMillis());
}
private ConcurrentHashMap<String, Long> getServiceGroup(String group) {
ConcurrentHashMap<String, Long> rc = serviceGroups.get(group);
if( rc == null ) {
rc = new ConcurrentHashMap<String, Long>();
serviceGroups.put(group, rc);
}
return rc;
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try {
long freshness = 1000*30;
String p = req.getParameter("freshness");
if( p!=null ) {
freshness = Long.parseLong(p);
}
String group = req.getPathInfo();
LOG.debug("group="+group);
ConcurrentHashMap<String, Long> services = getServiceGroup(group);
PrintWriter writer = resp.getWriter();
long now = System.currentTimeMillis();
long dropTime = now-maxKeepAge;
long minimumTime = now-freshness;
ArrayList<String> dropList = new ArrayList<String>();
for (Map.Entry<String, Long> entry : services.entrySet()) {
if( entry.getValue() > minimumTime ) {
writer.println(entry.getKey());
} else if( entry.getValue() < dropTime ) {
dropList.add(entry.getKey());
}
}
// We might as well get rid of the really old entries.
for (String service : dropList) {
services.remove(service);
}
} catch (Exception e) {
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error occured: "+e);
}
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String group = req.getPathInfo();
String service = req.getHeader("service");
LOG.debug("Unregistering: group="+group+", service="+service);
ConcurrentHashMap<String, Long> services = getServiceGroup(group);
services.remove(service);
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.discovery.http;
import java.net.URI;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
public class EmbeddedJettyServer implements org.apache.activemq.Service {
private HTTPDiscoveryAgent agent;
private Server server;
private SelectChannelConnector connector;
private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet();
public void start() throws Exception {
URI uri = new URI(agent.getRegistryURL());
server = new Server();
Context context = new Context(Context.NO_SECURITY | Context.NO_SESSIONS);
context.setContextPath("/");
ServletHolder holder = new ServletHolder();
holder.setServlet(camelServlet);
context.addServlet(holder, "/*");
server.setHandler(context);
server.start();
int port = 80;
if( uri.getPort() >=0 ) {
port = uri.getPort();
}
connector = new SelectChannelConnector();
connector.setPort(port);
server.addConnector(connector);
connector.start();
}
public void stop() throws Exception {
if( connector!=null ) {
connector.stop();
connector = null;
}
if( server!=null ) {
server.stop();
server = null;
}
}
public HTTPDiscoveryAgent getAgent() {
return agent;
}
public void setAgent(HTTPDiscoveryAgent agent) {
this.agent = agent;
}
}

View File

@ -0,0 +1,349 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.discovery.http;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.Service;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PutMethod;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class HTTPDiscoveryAgent implements DiscoveryAgent {
private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class);
private String registryURL = "http://localhost:8080/discovery-registry/default";
private HttpClient httpClient = new HttpClient();
private AtomicBoolean running=new AtomicBoolean();
private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>();
private final HashSet<String> registeredServices = new HashSet<String>();
private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap<String, SimpleDiscoveryEvent>();
private Thread thread;
private long updateInterval = 1000*10;
private String brokerName;
private boolean startEmbeddRegistry=false;
private Service jetty;
private AtomicInteger startCounter=new AtomicInteger(0);
private long initialReconnectDelay = 1000;
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
private boolean useExponentialBackOff=true;
private int maxReconnectAttempts;
private final Object sleepMutex = new Object();
private long minConnectTime = 5000;
class SimpleDiscoveryEvent extends DiscoveryEvent {
private int connectFailures;
private long reconnectDelay = initialReconnectDelay;
private long connectTime = System.currentTimeMillis();
private AtomicBoolean failed = new AtomicBoolean(false);
private AtomicBoolean removed = new AtomicBoolean(false);
public SimpleDiscoveryEvent(String service) {
super(service);
}
}
public String getGroup() {
return null;
}
public void registerService(String service) throws IOException {
synchronized(registeredServices) {
registeredServices.add(service);
}
doRegister(service);
}
synchronized private void doRegister(String service) {
String url = registryURL;
try {
PutMethod method = new PutMethod(url);
// method.setParams(createParams());
method.setRequestHeader("service", service);
int responseCode = httpClient.executeMethod(method);
LOG.debug("PUT to "+url+" got a "+responseCode);
} catch (Exception e) {
LOG.debug("PUT to "+url+" failed with: "+e);
}
}
synchronized private void doUnRegister(String service) {
String url = registryURL;
try {
DeleteMethod method = new DeleteMethod(url);
// method.setParams(createParams());
method.setRequestHeader("service", service);
int responseCode = httpClient.executeMethod(method);
LOG.debug("DELETE to "+url+" got a "+responseCode);
} catch (Exception e) {
LOG.debug("DELETE to "+url+" failed with: "+e);
}
}
// private HttpMethodParams createParams() {
// HttpMethodParams params = new HttpMethodParams();
// params.setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0,false));
// return params;
// }
synchronized private Set<String> doLookup(long freshness) {
String url = registryURL+"?freshness="+freshness;
try {
GetMethod method = new GetMethod(url);
// method.setParams(createParams());
int responseCode = httpClient.executeMethod(method);
LOG.debug("GET to "+url+" got a "+responseCode);
if( responseCode == 200 ) {
Set<String> rc = new HashSet<String>();
Scanner scanner = new Scanner(method.getResponseBodyAsStream());
while( scanner.hasNextLine() ) {
String service = scanner.nextLine();
if( service.trim().length() != 0 ) {
rc.add(service);
}
}
return rc;
} else {
LOG.debug("GET to "+url+" failed with response code: "+responseCode);
return null;
}
} catch (Exception e) {
LOG.debug("GET to "+url+" failed with: "+e);
return null;
}
}
public void serviceFailed(DiscoveryEvent devent) throws IOException {
final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
if (event.failed.compareAndSet(false, true)) {
discoveryListener.get().onServiceRemove(event);
if(!event.removed.get()) {
// Setup a thread to re-raise the event...
Thread thread = new Thread() {
public void run() {
// We detect a failed connection attempt because the service
// fails right away.
if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
LOG.debug("Failure occured soon after the discovery event was generated. It will be clasified as a connection failure: "+event);
event.connectFailures++;
if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled.");
return;
}
synchronized (sleepMutex) {
try {
if (!running.get() || event.removed.get()) {
return;
}
LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect.");
sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
if (!useExponentialBackOff) {
event.reconnectDelay = initialReconnectDelay;
} else {
// Exponential increment of reconnect delay.
event.reconnectDelay *= backOffMultiplier;
if (event.reconnectDelay > maxReconnectDelay) {
event.reconnectDelay = maxReconnectDelay;
}
}
} else {
event.connectFailures = 0;
event.reconnectDelay = initialReconnectDelay;
}
if (!running.get() || event.removed.get()) {
return;
}
event.connectTime = System.currentTimeMillis();
event.failed.set(false);
discoveryListener.get().onServiceAdd(event);
}
};
thread.setDaemon(true);
thread.start();
}
}
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
public void setDiscoveryListener(DiscoveryListener discoveryListener) {
this.discoveryListener.set(discoveryListener);
}
public void setGroup(String group) {
}
public void start() throws Exception {
if( startCounter.addAndGet(1)==1 ) {
if( startEmbeddRegistry ) {
jetty = createEmbeddedJettyServer();
Map props = new HashMap();
props.put("agent", this);
IntrospectionSupport.setProperties(jetty, props);
jetty.start();
}
running.set(true);
thread = new Thread("HTTPDiscovery Agent") {
@Override
public void run() {
while(running.get()) {
try {
update();
Thread.sleep(updateInterval);
} catch (InterruptedException e) {
return;
}
}
}
};
thread.setDaemon(true);
thread.start();
}
}
/**
* Create the EmbeddedJettyServer instance via reflection so that we can avoid a hard runtime dependency on
* jetty.
*
* @return
* @throws Exception
*/
private Service createEmbeddedJettyServer() throws Exception {
Class clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer");
return (Service)clazz.newInstance();
}
private void update() {
// Register all our services...
synchronized(registeredServices) {
for (String service : registeredServices) {
doRegister(service);
}
}
// Find new registered services...
DiscoveryListener discoveryListener = this.discoveryListener.get();
if(discoveryListener!=null) {
Set<String> activeServices = doLookup(updateInterval*3);
// If there is error talking the the central server, then activeServices == null
if( activeServices !=null ) {
synchronized(discoveredServices) {
HashSet<String> removedServices = new HashSet<String>(discoveredServices.keySet());
removedServices.removeAll(activeServices);
HashSet<String> addedServices = new HashSet<String>(activeServices);
addedServices.removeAll(discoveredServices.keySet());
addedServices.removeAll(removedServices);
for (String service : addedServices) {
SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service);
discoveredServices.put(service, e);
discoveryListener.onServiceAdd(e);
}
for (String service : removedServices) {
SimpleDiscoveryEvent e = discoveredServices.remove(service);
if( e !=null ) {
e.removed.set(true);
}
discoveryListener.onServiceRemove(e);
}
}
}
}
}
public void stop() throws Exception {
if( startCounter.decrementAndGet()==0 ) {
running.set(false);
if( thread!=null ) {
thread.join(updateInterval*3);
thread=null;
}
if( jetty!=null ) {
jetty.stop();
jetty = null;
}
}
}
public String getRegistryURL() {
return registryURL;
}
public void setRegistryURL(String discoveryRegistryURL) {
this.registryURL = discoveryRegistryURL;
}
public long getUpdateInterval() {
return updateInterval;
}
public void setUpdateInterval(long updateInterval) {
this.updateInterval = updateInterval;
}
public boolean isStartEmbeddRegistry() {
return startEmbeddRegistry;
}
public void setStartEmbeddRegistry(boolean startEmbeddRegistry) {
this.startEmbeddRegistry = startEmbeddRegistry;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.discovery.http;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
public class HTTPDiscoveryAgentFactory extends DiscoveryAgentFactory {
protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException {
try {
Map options = URISupport.parseParamters(uri);
uri = URISupport.removeQuery(uri);
HTTPDiscoveryAgent rc = new HTTPDiscoveryAgent();
rc.setRegistryURL(uri.toString());
IntrospectionSupport.setProperties(rc, options);
return rc;
} catch (Throwable e) {
throw IOExceptionSupport.create("Could not create discovery agent: " + uri, e);
}
}
}

View File

@ -0,0 +1,4 @@
## ---------------------------------------------------------------------------
##
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.discovery.http.HTTPDiscoveryAgentFactory

View File

@ -0,0 +1,52 @@
Usage:
java org.apache.activemq.store.amq.AMQJournalTool [options]* (directory) *
Displays the records stored in the Journal log files used by ActiveMQ. This
tool supports loading the journal data files from multiple directories. Normally
it is run against the journal archive directory and the active journal directory.
This tool supports controlling the output format using Velocity [1] templates.
It also supports filtering out records using a SQL like WHERE syntax implemented
using JoSQL.
Options to control output format:
Any valid Velocity Template Language (VTL) expression can be used to control the
display of the record.
--message-format=VTL The format used to display message records. Message
records get created every time a producer sends a persistent message to the broker.
The message gets recorded in the journal even if it's transaction is rolled back.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}
--topic-ack-format=VTL The format used to display topic ack records. A topic
ack records that a durable subscription for a topic has acknowleged a set of messages.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}
--queue-ack-format=VTL The format used to display queue ack records. A queue
ack records that a consumer for a quue has acknowleged a message.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}
--transaction-format=VTL The format used to display transaction records. Transaction records
are used to record transaction related actions like commit and rollback.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.transactionId}
--trace-format=VTL The format used to display trace records.
Trace records are informational messages stored in the journal that assist in Auditing.
For example a trace message is recorded whenever the broker is restarted or when the
long term store is checkpointed.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.message}
Options to control the selection of records displayed:
--where=VALUE The where clause used to control the records selected
for display. It can select on all the fields available in the velocity context.
example: --where="type='ActiveMQTextMessage' and location.dataFileId > 2"
Other Options:
--help Show this help screen.
Example:
java org.apache.activemq.store.amq.AMQJournalTool /path/to/archive /path/to/journal

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.amq.reader;
import java.io.File;
import java.util.Set;
import javax.jms.Message;
import junit.framework.TestCase;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
public class AMQReaderTest extends TestCase {
public void testIterateArchive() throws Exception{
String resourceName = getClass().getPackage().getName() + File.separator + "data";
resourceName = resourceName.replace('.', File.separatorChar);
Resource resource = new ClassPathResource(resourceName);
AMQReader reader = new AMQReader(resource.getFile());
for (Message m:reader) {
assertNotNull(m);
}
}
public void xtestIterateFile() throws Exception{
String resourceName = getClass().getPackage().getName() + File.separator + "data";
resourceName = resourceName.replace('.', File.separatorChar);
Resource resource = new ClassPathResource(resourceName);
Set<File> files = AMQReader.listDataFiles(resource.getFile());
assertNotNull(files);
assertTrue(files.size() >0);
for (File file: files) {
System.err.println("READING " + file);
AMQReader reader = new AMQReader(file);
for (Message m:reader) {
assertNotNull(m);
}
}
}
}