diff --git a/activemq-console/pom.xml b/activemq-console/pom.xml index 4aeab8aa8c..0158d80737 100644 --- a/activemq-console/pom.xml +++ b/activemq-console/pom.xml @@ -168,7 +168,20 @@ - + + org.apache.activemq.protobuf + activemq-protobuf + + src/main/protobuf + + + + + compile + + + + org.fusesource.hawtbuf hawtbuf-protoc diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/StoreBackupCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/StoreBackupCommand.java new file mode 100644 index 0000000000..43a6f278bb --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/StoreBackupCommand.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command; + +import org.apache.activemq.console.CommandContext; +import org.apache.activemq.console.command.store.StoreBackup; +import org.apache.activemq.console.command.store.amq.CommandLineSupport; + +import java.util.Arrays; +import java.util.List; + +public class StoreBackupCommand implements Command { + + private CommandContext context; + + @Override + public void setCommandContext(CommandContext context) { + this.context = context; + } + + @Override + public String getName() { + return "backup"; + } + + @Override + public String getOneLineDescription() { + return "Backup a message (or range) from a queue to an archive file"; + } + + @Override + public void execute(List tokens) throws Exception { + StoreBackup backup = new StoreBackup(); + String[] remaining = CommandLineSupport.setOptions(backup, tokens.toArray(new String[tokens.size()])); + if (remaining.length > 0) { + throw new Exception("Unexpected arguments: " + Arrays.asList(remaining)); + } + backup.execute(); + } +} diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java index c53add7039..4da756e632 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java @@ -23,9 +23,6 @@ import org.apache.activemq.console.command.store.amq.CommandLineSupport; import java.util.Arrays; import java.util.List; -/** - * @author Hiram Chirino - */ public class StoreExportCommand implements Command { private CommandContext context; diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/store/BackupStreamManager.java b/activemq-console/src/main/java/org/apache/activemq/console/command/store/BackupStreamManager.java new file mode 100644 index 0000000000..b35ce0514a --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/store/BackupStreamManager.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command.store; + +import org.apache.activemq.console.command.store.protobuf.*; +import org.apache.activemq.console.command.store.tar.TarEntry; +import org.apache.activemq.console.command.store.tar.TarOutputStream; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.protobuf.MessageBuffer; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.GZIPOutputStream; + +public class BackupStreamManager { + + private final OutputStream target; + private final int version; + TarOutputStream stream; + + BackupStreamManager(OutputStream target, int version) throws IOException { + this.target = target; + this.version = version; + stream = new TarOutputStream(new GZIPOutputStream(target)); + store("ver", new AsciiBuffer(""+version)); + } + + + long seq = 0; + + public void finish() throws IOException { + stream.close(); + } + + private void store(String ext, Buffer value) throws IOException { + TarEntry entry = new TarEntry(seq + "." + ext); + seq += 1; + entry.setSize(value.getLength()); + stream.putNextEntry(entry); + stream.write(value.getData()); + stream.closeEntry(); + } + + private void store(String ext, MessageBuffer value) throws IOException { + TarEntry entry = new TarEntry(seq + "." + ext); + seq += 1; + entry.setSize(value.serializedSizeFramed()); + stream.putNextEntry(entry); + value.writeFramed(stream); + stream.closeEntry(); + } + + + public void store_queue(QueuePB value) throws IOException { + store("que", value.toUnframedBuffer()); + } + public void store_queue_entry(QueueEntryPB value) throws IOException { + store("qen", value.toUnframedBuffer()); + } + public void store_message(MessagePB value) throws IOException { + store("msg", value.toUnframedBuffer()); + } + public void store_map_entry(MapEntryPB value) throws IOException { + store("map", value.toUnframedBuffer()); + } + +} diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java b/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java new file mode 100644 index 0000000000..026accf0eb --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command.store; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.console.command.store.protobuf.MessagePB; +import org.apache.activemq.console.command.store.protobuf.QueueEntryPB; +import org.apache.activemq.console.command.store.protobuf.QueuePB; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionRecoveryListener; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.apache.activemq.protobuf.AsciiBuffer; +import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.protobuf.UTF8Buffer; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class StoreBackup { + + static final int OPENWIRE_VERSION = 11; + static final boolean TIGHT_ENCODING = false; + + URI config; + String filename; + File file; + + String queue; + Integer offset; + Integer count; + + private final ObjectMapper mapper = new ObjectMapper(); + private final AsciiBuffer ds_kind = new AsciiBuffer("ds"); + private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp"); + private final AsciiBuffer codec_id = new AsciiBuffer("openwire"); + private final OpenWireFormat wireformat = new OpenWireFormat(); + + public StoreBackup() throws URISyntaxException { + config = new URI("xbean:activemq.xml"); + wireformat.setCacheEnabled(false); + wireformat.setTightEncodingEnabled(TIGHT_ENCODING); + wireformat.setVersion(OPENWIRE_VERSION); + } + + public void execute() throws Exception { + if (config == null) { + throw new Exception("required --config option missing"); + } + if (filename == null) { + throw new Exception("required --filename option missing"); + } + + if (offset != null && count == null) { + throw new Exception("optional --offset and --count must be specified together"); + } + + setFile(new File(filename)); + System.out.println("Loading: " + config); + BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting.. + BrokerService broker = BrokerFactory.createBroker(config); + BrokerFactory.resetStartDefault(); + PersistenceAdapter store = broker.getPersistenceAdapter(); + + System.out.println("Starting: " + store); + store.start(); + try(BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file))) { + export(store, fos); + } finally { + store.stop(); + } + } + + void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception { + + final long[] messageKeyCounter = new long[]{0}; + final long[] containerKeyCounter = new long[]{0}; + final BackupStreamManager manager = new BackupStreamManager(fos, 1); + + final int[] preparedTxs = new int[]{0}; + store.createTransactionStore().recover(new TransactionRecoveryListener() { + @Override + public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { + preparedTxs[0] += 1; + } + }); + + if (preparedTxs[0] > 0) { + throw new Exception("Cannot export a store with prepared XA transactions. Please commit or rollback those transactions before attempting to backup."); + } + + for (ActiveMQDestination odest : store.getDestinations()) { + + if(queue != null && !queue.equals(odest.getPhysicalName())) { + continue; + } + + containerKeyCounter[0]++; + if (odest instanceof ActiveMQQueue) { + ActiveMQQueue dest = (ActiveMQQueue) odest; + MessageStore queue = store.createQueueMessageStore(dest); + + QueuePB destRecord = new QueuePB(); + destRecord.setKey(containerKeyCounter[0]); + destRecord.setBindingKind(ptp_kind); + + final long[] seqKeyCounter = new long[]{0}; + + HashMap jsonMap = new HashMap(); + jsonMap.put("@class", "queue_destination"); + jsonMap.put("name", dest.getQueueName()); + String json = mapper.writeValueAsString(jsonMap); + System.out.println(json); + destRecord.setBindingData(new UTF8Buffer(json)); + manager.store_queue(destRecord); + + MessageRecoveryListener queueRecoveryListener = new MessageRecoveryListener() { + + @Override + public boolean hasSpace() { + return true; + } + + @Override + public boolean recoverMessageReference(MessageId ref) throws Exception { + return true; + } + + @Override + public boolean isDuplicate(MessageId ref) { + return false; + } + + @Override + public boolean recoverMessage(Message message) throws IOException { + messageKeyCounter[0]++; + seqKeyCounter[0]++; + + MessagePB messageRecord = createMessagePB(message, messageKeyCounter[0]); + manager.store_message(messageRecord); + + QueueEntryPB entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]); + manager.store_queue_entry(entryRecord); + + return true; + } + }; + if(offset != null) { + queue.recoverNextMessages(offset, count, queueRecoveryListener); + } else { + queue.recover(queueRecoveryListener); + } + } else if (odest instanceof ActiveMQTopic) { + ActiveMQTopic dest = (ActiveMQTopic) odest; + + TopicMessageStore topic = store.createTopicMessageStore(dest); + for (SubscriptionInfo sub : topic.getAllSubscriptions()) { + + QueuePB destRecord = new QueuePB(); + destRecord.setKey(containerKeyCounter[0]); + destRecord.setBindingKind(ds_kind); + + // TODO: use a real JSON encoder like jackson. + HashMap jsonMap = new HashMap(); + jsonMap.put("@class", "dsub_destination"); + jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName()); + HashMap jsonTopic = new HashMap(); + jsonTopic.put("name", dest.getTopicName()); + jsonMap.put("topics", new Object[]{jsonTopic}); + if (sub.getSelector() != null) { + jsonMap.put("selector", sub.getSelector()); + } + jsonMap.put("noLocal", sub.isNoLocal()); + String json = mapper.writeValueAsString(jsonMap); + System.out.println(json); + + destRecord.setBindingData(new UTF8Buffer(json)); + manager.store_queue(destRecord); + + final long seqKeyCounter[] = new long[]{0}; + topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() { + @Override + public boolean hasSpace() { + return true; + } + + @Override + public boolean recoverMessageReference(MessageId ref) throws Exception { + return true; + } + + @Override + public boolean isDuplicate(MessageId ref) { + return false; + } + + @Override + public boolean recoverMessage(Message message) throws IOException { + messageKeyCounter[0]++; + seqKeyCounter[0]++; + + MessagePB messageRecord = createMessagePB(message, messageKeyCounter[0]); + manager.store_message(messageRecord); + + QueueEntryPB entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]); + manager.store_queue_entry(entryRecord); + return true; + } + }); + + } + } + } + manager.finish(); + } + + private QueueEntryPB createQueueEntryPB(Message message, long queueKey, long queueSeq, long messageKey) { + QueueEntryPB entryRecord = new QueueEntryPB(); + entryRecord.setQueueKey(queueKey); + entryRecord.setQueueSeq(queueSeq); + entryRecord.setMessageKey(messageKey); + entryRecord.setSize(message.getSize()); + if (message.getExpiration() != 0) { + entryRecord.setExpiration(message.getExpiration()); + } + if (message.getRedeliveryCounter() != 0) { + entryRecord.setRedeliveries(message.getRedeliveryCounter()); + } + return entryRecord; + } + + private MessagePB createMessagePB(Message message, long messageKey) throws IOException { + DataByteArrayOutputStream mos = new DataByteArrayOutputStream(); + mos.writeBoolean(TIGHT_ENCODING); + mos.writeVarInt(OPENWIRE_VERSION); + wireformat.marshal(message, mos); + + MessagePB messageRecord = new MessagePB(); + messageRecord.setCodec(codec_id); + messageRecord.setMessageKey(messageKey); + messageRecord.setSize(message.getSize()); + messageRecord.setValue(new Buffer(mos.toBuffer().getData())); + return messageRecord; + } + + public File getFile() { + return file; + } + + public void setFilename(String filename) { + this.filename = filename; + } + + public String getFilename() { + return filename; + } + + public void setFile(File file) { + this.file = file; + } + + public URI getConfig() { + return config; + } + + public void setConfig(URI config) { + this.config = config; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getQueue() { + return queue; + } + + public void setOffset(int offset) { + this.offset = offset; + } + + public Integer getOffset() { + return offset; + } + + public void setCount(int count) { + this.count = count; + } + + public Integer getCount() { + return count; + } +} diff --git a/activemq-console/src/main/protobuf/data.proto b/activemq-console/src/main/protobuf/data.proto new file mode 100644 index 0000000000..0cc765d079 --- /dev/null +++ b/activemq-console/src/main/protobuf/data.proto @@ -0,0 +1,60 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.apache.activemq.console.command.store.protobuf; + +option java_multiple_files = true; + +/////////////////////////////////////////////////////////////// +// Message related operations. +/////////////////////////////////////////////////////////////// + +message MessagePB { + required int64 messageKey=1; + required bytes codec = 2 [java_override_type = "AsciiBuffer"]; + optional int32 size = 3; + optional bytes value = 4; + optional sint64 expiration = 5; + optional int32 compression = 6; + + optional bytes direct_data = 10; + optional bytes direct_file = 12; + optional int64 direct_offset = 13; + optional int32 direct_size = 14; +} + +message QueuePB { + required int64 key=1; + optional bytes binding_kind = 2 [java_override_type = "AsciiBuffer"]; + optional bytes binding_data = 3; +} + +message QueueEntryPB { + required int64 queueKey=1; + required int64 queueSeq=2; + required int64 messageKey=3; + optional int32 size=4; + optional bytes attachment=5; + optional int32 redeliveries = 6; + optional sint64 expiration=7; + optional bytes messageLocator=8; + repeated bytes sender=9; +} + +message MapEntryPB { + required bytes key = 1; + optional bytes value = 2; +} diff --git a/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command b/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command index bb7ddb629c..7df27efab8 100644 --- a/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command +++ b/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command @@ -28,3 +28,4 @@ org.apache.activemq.console.command.StoreExportCommand org.apache.activemq.console.command.PurgeCommand org.apache.activemq.console.command.ProducerCommand org.apache.activemq.console.command.ConsumerCommand +org.apache.activemq.console.command.StoreBackupCommand