git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@829066 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-10-23 14:00:02 +00:00
parent 5074fefc09
commit e0ee61348e
13 changed files with 1145 additions and 1 deletions

View File

@ -80,6 +80,22 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>velocity</groupId>
<artifactId>velocity</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>net.sf.josql</groupId>
<artifactId>josql</artifactId>
</dependency>
<dependency>
<groupId>net.sf.josql</groupId>
<artifactId>gentlyweb-utils</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.activemq.console.CommandContext;
import org.apache.activemq.console.command.store.amq.AMQJournalToolCommand;
import org.apache.activemq.console.formatter.CommandShellOutputFormatter;
public class ShellCommand extends AbstractCommand {
@ -41,10 +42,12 @@ public class ShellCommand extends AbstractCommand {
"",
"Tasks (default task is start):",
" start - Creates and starts a broker using a configuration file, or a broker URI.",
" create - Creates a runnable broker instance in the specified path",
" stop - Stops a running broker specified by the broker name.",
" list - Lists all available brokers in the specified JMX context.",
" query - Display selected broker component's attributes and statistics.",
" browse - Display selected messages in a specified destination.",
" journal-audit - Allows you to view records stored in the persistent journal.",
"",
"Task Options (Options specific to each task):",
" --extdir <dir> - Add the jar files in the directory to the classpath.",
@ -106,6 +109,8 @@ public class ShellCommand extends AbstractCommand {
String taskToken = (String)tokens.remove(0);
if (taskToken.equals("start")) {
command = new StartCommand();
} else if (taskToken.equals("create")) {
command = new CreateCommand();
} else if (taskToken.equals("stop")) {
command = new ShutdownCommand();
} else if (taskToken.equals("list")) {
@ -118,6 +123,8 @@ public class ShellCommand extends AbstractCommand {
command = new AmqBrowseCommand();
} else if (taskToken.equals("purge")) {
command = new PurgeCommand();
} else if (taskToken.equals("journal-audit")) {
command = new AMQJournalToolCommand();
} else if (taskToken.equals("help")) {
printHelp();
} else {
@ -134,7 +141,7 @@ public class ShellCommand extends AbstractCommand {
}
/**
/**
* Print the help messages for the browse command
*/
protected void printHelp() {

View File

@ -0,0 +1,353 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store.amq;
import java.io.File;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.velocity.Template;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.Velocity;
import org.apache.velocity.app.VelocityEngine;
import org.josql.Query;
/**
* Allows you to view the contents of a Journal.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AMQJournalTool {
private final ArrayList<File> dirs = new ArrayList<File>();
private final WireFormat wireFormat = new OpenWireFormat();
private final HashMap<String, String> resources = new HashMap<String, String>();
private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
private String where;
private VelocityContext context;
private VelocityEngine velocity;
private boolean help;
public static void main(String[] args) throws Exception {
AMQJournalTool consumerTool = new AMQJournalTool();
String[] directories = CommandLineSupport
.setOptions(consumerTool, args);
if (directories.length < 1) {
System.out
.println("Please specify the directories with journal data to scan");
return;
}
for (int i = 0; i < directories.length; i++) {
consumerTool.getDirs().add(new File(directories[i]));
}
consumerTool.execute();
}
public void execute() throws Exception {
if( help ) {
showHelp();
return;
}
if (getDirs().size() < 1) {
System.out.println("");
System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
System.out.println("");
showHelp();
return;
}
for (File dir : getDirs()) {
if( !dir.exists() ) {
System.out.println("");
System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
System.out.println("");
showHelp();
return;
}
if( !dir.isDirectory() ) {
System.out.println("");
System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
System.out.println("");
showHelp();
return;
}
}
context = new VelocityContext();
List keys = Arrays.asList(context.getKeys());
for (Iterator iterator = System.getProperties().entrySet()
.iterator(); iterator.hasNext();) {
Map.Entry kv = (Map.Entry) iterator.next();
String name = (String) kv.getKey();
String value = (String) kv.getValue();
if (!keys.contains(name)) {
context.put(name, value);
}
}
velocity = new VelocityEngine();
velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
velocity.init();
resources.put("message", messageFormat);
resources.put("topicAck", topicAckFormat);
resources.put("queueAck", queueAckFormat);
resources.put("transaction", transactionFormat);
resources.put("trace", traceFormat);
resources.put("unknown", unknownFormat);
Query query = null;
if (where != null) {
query = new Query();
query.parse("select * from "+Entry.class.getName()+" where "+where);
}
ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
manager.start();
try {
Location curr = manager.getFirstLocation();
while (curr != null) {
ByteSequence data = manager.read(curr);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
Entry entry = new Entry();
entry.setLocation(curr);
entry.setRecord(c);
entry.setData(data);
entry.setQuery(query);
process(entry);
curr = manager.getNextLocation(curr);
}
} finally {
manager.close();
}
}
private void showHelp() {
InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
Scanner scanner = new Scanner(is);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
System.out.println(line);
}
scanner.close(); }
private void process(Entry entry) throws Exception {
Location location = entry.getLocation();
DataStructure record = entry.getRecord();
switch (record.getDataStructureType()) {
case ActiveMQMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQBytesMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQBlobMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQMapMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQObjectMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQStreamMessage");
entry.setFormater("message");
display(entry);
break;
case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
entry.setType("ActiveMQTextMessage");
entry.setFormater("message");
display(entry);
break;
case JournalQueueAck.DATA_STRUCTURE_TYPE:
entry.setType("Queue Ack");
entry.setFormater("queueAck");
display(entry);
break;
case JournalTopicAck.DATA_STRUCTURE_TYPE:
entry.setType("Topic Ack");
entry.setFormater("topicAck");
display(entry);
break;
case JournalTransaction.DATA_STRUCTURE_TYPE:
entry.setType(getType((JournalTransaction) record));
entry.setFormater("transaction");
display(entry);
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
entry.setType("Trace");
entry.setFormater("trace");
display(entry);
break;
default:
entry.setType("Unknown");
entry.setFormater("unknown");
display(entry);
break;
}
}
private String getType(JournalTransaction record) {
switch (record.getType()) {
case JournalTransaction.XA_PREPARE:
return "XA Prepare";
case JournalTransaction.XA_COMMIT:
return "XA Commit";
case JournalTransaction.XA_ROLLBACK:
return "XA Rollback";
case JournalTransaction.LOCAL_COMMIT:
return "Commit";
case JournalTransaction.LOCAL_ROLLBACK:
return "Rollback";
}
return "Unknown Transaction";
}
private void display(Entry entry) throws Exception {
if (entry.getQuery() != null) {
List list = Collections.singletonList(entry);
List results = entry.getQuery().execute(list).getResults();
if (results.isEmpty()) {
return;
}
}
CustomResourceLoader.setResources(resources);
try {
context.put("location", entry.getLocation());
context.put("record", entry.getRecord());
context.put("type", entry.getType());
if (entry.getRecord() instanceof ActiveMQMessage) {
context.put("body", new MessageBodyFormatter(
(ActiveMQMessage) entry.getRecord()));
}
Template template = velocity.getTemplate(entry.getFormater());
PrintWriter writer = new PrintWriter(System.out);
template.merge(context, writer);
writer.println();
writer.flush();
} finally {
CustomResourceLoader.setResources(null);
}
}
public void setMessageFormat(String messageFormat) {
this.messageFormat = messageFormat;
}
public void setTopicAckFormat(String ackFormat) {
this.topicAckFormat = ackFormat;
}
public void setTransactionFormat(String transactionFormat) {
this.transactionFormat = transactionFormat;
}
public void setTraceFormat(String traceFormat) {
this.traceFormat = traceFormat;
}
public void setUnknownFormat(String unknownFormat) {
this.unknownFormat = unknownFormat;
}
public void setQueueAckFormat(String queueAckFormat) {
this.queueAckFormat = queueAckFormat;
}
public String getQuery() {
return where;
}
public void setWhere(String query) {
this.where = query;
}
public boolean isHelp() {
return help;
}
public void setHelp(boolean help) {
this.help = help;
}
/**
* @return the dirs
*/
public ArrayList<File> getDirs() {
return dirs;
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store.amq;
import java.io.File;
import java.util.List;
import org.apache.activemq.console.CommandContext;
import org.apache.activemq.console.command.Command;
public class AMQJournalToolCommand implements Command {
private CommandContext context;
public void execute(List<String> tokens) throws Exception {
AMQJournalTool consumerTool = new AMQJournalTool();
String args[] = new String[tokens.size()];
tokens.toArray(args);
String[] directories = CommandLineSupport.setOptions(consumerTool, args);
for (int i = 0; i < directories.length; i++) {
consumerTool.getDirs().add(new File(directories[i]));
}
consumerTool.execute();
}
public void setCommandContext(CommandContext context) {
this.context = context;
}
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store.amq;
import java.util.ArrayList;
import org.apache.activemq.util.IntrospectionSupport;
/**
* Helper utility that can be used to set the properties on any object using
* command line arguments.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public final class CommandLineSupport {
private CommandLineSupport() {
}
/**
* Sets the properties of an object given the command line args.
*
* if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent
*
* then it will try to call the following setters on the target object.
*
* target.setAckMode("AUTO");
* target.setURL(new URI("tcp://localhost:61616") );
* target.setPersistent(true);
*
* Notice the the proper conversion for the argument is determined by examining the
* setter arguement type.
*
* @param target the object that will have it's properties set
* @param args the commline options
* @return any arguments that are not valid options for the target
*/
public static String[] setOptions(Object target, String[] args) {
ArrayList<String> rc = new ArrayList<String>();
for (int i = 0; i < args.length; i++) {
if (args[i] == null) {
continue;
}
if (args[i].startsWith("--")) {
// --options without a specified value are considered boolean
// flags that are enabled.
String value = "true";
String name = args[i].substring(2);
// if --option=value case
int p = name.indexOf("=");
if (p > 0) {
value = name.substring(p + 1);
name = name.substring(0, p);
}
// name not set, then it's an unrecognized option
if (name.length() == 0) {
rc.add(args[i]);
continue;
}
String propName = convertOptionToPropertyName(name);
if (!IntrospectionSupport.setProperty(target, propName, value)) {
rc.add(args[i]);
continue;
}
} else {
rc.add(args[i]);
}
}
String r[] = new String[rc.size()];
rc.toArray(r);
return r;
}
/**
* converts strings like: test-enabled to testEnabled
*
* @param name
* @return
*/
private static String convertOptionToPropertyName(String name) {
String rc = "";
// Look for '-' and strip and then convert the subsequent char to
// uppercase
int p = name.indexOf("-");
while (p > 0) {
// strip
rc += name.substring(0, p);
name = name.substring(p + 1);
// can I convert the next char to upper?
if (name.length() > 0) {
rc += name.substring(0, 1).toUpperCase();
name = name.substring(1);
}
p = name.indexOf("-");
}
return rc + name;
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store.amq;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
import org.apache.commons.collections.ExtendedProperties;
import org.apache.velocity.exception.ResourceNotFoundException;
import org.apache.velocity.runtime.RuntimeServices;
import org.apache.velocity.runtime.resource.Resource;
import org.apache.velocity.runtime.resource.loader.FileResourceLoader;
import org.apache.velocity.runtime.resource.loader.ResourceLoader;
public class CustomResourceLoader extends ResourceLoader {
private final static ThreadLocal<HashMap<String, String>> resourcesTL = new ThreadLocal<HashMap<String, String>>();
private final FileResourceLoader fileResourceLoader = new FileResourceLoader();
@Override
public void commonInit(RuntimeServices rs, ExtendedProperties configuration) {
super.commonInit(rs, configuration);
fileResourceLoader.commonInit(rs, configuration);
}
public void init( ExtendedProperties configuration)
{
fileResourceLoader.init(configuration);
}
/**
*/
public synchronized InputStream getResourceStream( String name )
throws ResourceNotFoundException
{
InputStream result = null;
if (name == null || name.length() == 0)
{
throw new ResourceNotFoundException ("No template name provided");
}
String value = null;
HashMap<String, String> resources = resourcesTL.get();
if( resources!=null ) {
value = resources.get(name);
}
if( value == null ) {
result = this.fileResourceLoader.getResourceStream(name);
} else {
try
{
result = new ByteArrayInputStream(value.getBytes());
}
catch( Exception e )
{
throw new ResourceNotFoundException( e.getMessage() );
}
}
return result;
}
public boolean isSourceModified(Resource resource)
{
return false;
}
public long getLastModified(Resource resource)
{
return 0;
}
static public HashMap<String, String> getResources() {
return resourcesTL.get();
}
static public void setResources(HashMap<String, String> arg0) {
resourcesTL.set(arg0);
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store.amq;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.util.ByteSequence;
import org.josql.Query;
public class Entry {
Location location;
DataStructure record;
private ByteSequence data;
private String type;
private String formater;
private Query query;
public Location getLocation() {
return location;
}
public void setLocation(Location location) {
this.location = location;
}
public DataStructure getRecord() {
return record;
}
public void setRecord(DataStructure record) {
this.record = record;
}
public void setData(ByteSequence data) {
this.data = data;
}
public void setType(String type) {
this.type = type;
}
public ByteSequence getData() {
return data;
}
public String getType() {
return type;
}
public void setFormater(String formater) {
this.formater = formater;
}
public String getFormater() {
return formater;
}
public void setQuery(Query query) {
this.query = query;
}
public Query getQuery() {
return query;
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store.amq;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.ByteSequence;
public class MessageBodyFormatter {
final ActiveMQMessage message;
public MessageBodyFormatter(ActiveMQMessage message) {
this.message=message;
}
@Override
public String toString() {
try {
switch (message.getDataStructureType()) {
case ActiveMQMessage.DATA_STRUCTURE_TYPE:
return "";
case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
ActiveMQBlobMessage blob = (ActiveMQBlobMessage) message;
return blob.getRemoteBlobUrl();
case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
ActiveMQMapMessage map = (ActiveMQMapMessage)message;
return map.getContentMap().toString();
case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
ActiveMQTextMessage text = (ActiveMQTextMessage)message;
return text.getText();
case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
ByteSequence data = message.getContent();
return "binary payload {length="+data.getLength()+", compressed="+message.isCompressed()+"}";
}
} catch (JMSException e) {
}
return "";
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store.amq.reader;
import java.util.Iterator;
import javax.jms.InvalidSelectorException;
import javax.jms.Message;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
/**
* An Iterator for the AMQReader
*
*/
class AMQIterator implements Iterator<Message>{
private AMQReader reader;
private BooleanExpression expression;
private MessageLocation currentLocation;
private MessageLocation nextLocation;
private boolean valid=true;
AMQIterator(AMQReader reader, BooleanExpression expression){
this.reader=reader;
this.expression=expression;
}
public boolean hasNext() {
try {
this.nextLocation = reader.getNextMessage(currentLocation);
Message next = nextLocation != null ? nextLocation.getMessage()
: null;
if (expression == null) {
return next != null;
} else {
while (next != null) {
MessageEvaluationContext context = new MessageEvaluationContext();
context.setMessageReference((MessageReference) next);
if (expression.matches(context)) {
return true;
}
this.nextLocation = reader.getNextMessage(currentLocation);
next = nextLocation != null ? nextLocation.getMessage()
: null;
}
valid=false;
return false;
}
} catch (Exception e) {
throw new RuntimeException(
"Failed to get next message from reader ", e);
}
}
public Message next() {
if (valid && (nextLocation != null || hasNext())) {
this.currentLocation=nextLocation;
return nextLocation.getMessage();
}
return null;
}
public void remove() {
throw new IllegalStateException("Not supported");
}
}

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store.amq.reader;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.jms.InvalidSelectorException;
import javax.jms.Message;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
* Reads and iterates through data log files for the AMQMessage Store
*
*/
public class AMQReader implements Iterable<Message> {
private AsyncDataManager dataManager;
private WireFormat wireFormat = new OpenWireFormat();
private File file;
private BooleanExpression expression;
/**
* List all the data files in a directory
* @param directory
* @return
* @throws IOException
*/
public static Set<File> listDataFiles(File directory) throws IOException{
Set<File>result = new HashSet<File>();
if (directory == null || !directory.exists() || !directory.isDirectory()) {
throw new IOException("Invalid Directory " + directory);
}
AsyncDataManager dataManager = new AsyncDataManager();
dataManager.setDirectory(directory);
dataManager.start();
Set<File> set = dataManager.getFiles();
if (set != null) {
result.addAll(set);
}
dataManager.close();
return result;
}
/**
* Create the AMQReader to read a directory of amq data logs - or an
* individual data log file
*
* @param file the directory - or file
* @throws IOException
* @throws InvalidSelectorException
* @throws IOException
* @throws InvalidSelectorException
*/
public AMQReader(File file) throws InvalidSelectorException, IOException {
this(file,null);
}
/**
* Create the AMQReader to read a directory of amq data logs - or an
* individual data log file
*
* @param file the directory - or file
* @param selector the JMS selector or null to select all
* @throws IOException
* @throws InvalidSelectorException
*/
public AMQReader(File file, String selector) throws IOException, InvalidSelectorException {
String str = selector != null ? selector.trim() : null;
if (str != null && str.length() > 0) {
this.expression=SelectorParser.parse(str);
}
dataManager = new AsyncDataManager();
dataManager.setArchiveDataLogs(false);
if (file.isDirectory()) {
dataManager.setDirectory(file);
} else {
dataManager.setDirectory(file.getParentFile());
dataManager.setDirectoryArchive(file);
this.file = file;
}
dataManager.start();
}
public Iterator<Message> iterator() {
return new AMQIterator(this,this.expression);
}
protected MessageLocation getNextMessage(MessageLocation lastLocation)
throws IllegalStateException, IOException {
if (this.file != null) {
return getInternalNextMessage(this.file, lastLocation);
}
return getInternalNextMessage(lastLocation);
}
private MessageLocation getInternalNextMessage(MessageLocation lastLocation)
throws IllegalStateException, IOException {
return getInternalNextMessage(null, lastLocation);
}
private MessageLocation getInternalNextMessage(File file,
MessageLocation lastLocation) throws IllegalStateException,
IOException {
MessageLocation result = lastLocation;
if (result != null) {
result.setMessage(null);
}
Message message = null;
Location pos = lastLocation != null ? lastLocation.getLocation() : null;
while ((pos = getNextLocation(file, pos)) != null) {
message = getMessage(pos);
if (message != null) {
if (result == null) {
result = new MessageLocation();
}
result.setMessage(message);
break;
}
}
result.setLocation(pos);
if (pos == null && message == null) {
result = null;
} else {
result.setLocation(pos);
}
return result;
}
private Location getNextLocation(File file, Location last)
throws IllegalStateException, IOException {
if (file != null) {
return dataManager.getNextLocation(file, last, true);
}
return dataManager.getNextLocation(last);
}
private Message getMessage(Location location) throws IOException {
ByteSequence data = dataManager.read(location);
DataStructure c = (DataStructure) wireFormat.unmarshal(data);
if (c instanceof Message) {
return (Message) c;
}
return null;
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store.amq.reader;
import javax.jms.Message;
import org.apache.activemq.kaha.impl.async.Location;
/**
* A holder for a message
*
*/
class MessageLocation {
private Message message;
private Location location;
/**
* @return the location
*/
public Location getLocation() {
return location;
}
/**
* @param location
*/
public void setLocation(Location location) {
this.location = location;
}
/**
* @return the message
*/
public Message getMessage() {
return message;
}
/**
* @param message
*/
public void setMessage(Message message) {
this.message = message;
}
}

View File

@ -0,0 +1,52 @@
Usage:
java org.apache.activemq.console.command.store.amq.AMQJournalTool [options]* (directory) *
Displays the records stored in the Journal log files used by ActiveMQ. This
tool supports loading the journal data files from multiple directories. Normally
it is run against the journal archive directory and the active journal directory.
This tool supports controlling the output format using Velocity [1] templates.
It also supports filtering out records using a SQL like WHERE syntax implemented
using JoSQL.
Options to control output format:
Any valid Velocity Template Language (VTL) expression can be used to control the
display of the record.
--message-format=VTL The format used to display message records. Message
records get created every time a producer sends a persistent message to the broker.
The message gets recorded in the journal even if it's transaction is rolled back.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}
--topic-ack-format=VTL The format used to display topic ack records. A topic
ack records that a durable subscription for a topic has acknowleged a set of messages.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}
--queue-ack-format=VTL The format used to display queue ack records. A queue
ack records that a consumer for a quue has acknowleged a message.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}
--transaction-format=VTL The format used to display transaction records. Transaction records
are used to record transaction related actions like commit and rollback.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.transactionId}
--trace-format=VTL The format used to display trace records.
Trace records are informational messages stored in the journal that assist in Auditing.
For example a trace message is recorded whenever the broker is restarted or when the
long term store is checkpointed.
Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.message}
Options to control the selection of records displayed:
--where=VALUE The where clause used to control the records selected
for display. It can select on all the fields available in the velocity context.
example: --where="type='ActiveMQTextMessage' and location.dataFileId > 2"
Other Options:
--help Show this help screen.
Example:
java org.apache.activemq.console.command.store.amq.AMQJournalTool /path/to/archive /path/to/journal

View File

@ -221,6 +221,8 @@
<include>org.codehaus.woodstox:wstx-asl</include>
<include>org.springframework.ws:spring-oxm-tiger</include>
<include>org.codehaus.jettison:jettison</include>
<include>velocity:velocity</include>
<include>net.sf.josql:josql</include>
</includes>
</dependencySet>
<dependencySet>