[AMQ-9533] Add Backup command that uses activemq-protobuf (to replace export)

This commit is contained in:
Matt Pavlovich 2024-07-11 13:17:23 -05:00 committed by Matt Pavlovich
parent dad947fe4a
commit 5309352e12
7 changed files with 537 additions and 4 deletions

View File

@ -168,7 +168,20 @@
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
<configuration>
<sourceDirectory>src/main/protobuf</sourceDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.fusesource.hawtbuf</groupId>
<artifactId>hawtbuf-protoc</artifactId>

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command;
import org.apache.activemq.console.CommandContext;
import org.apache.activemq.console.command.store.StoreBackup;
import org.apache.activemq.console.command.store.amq.CommandLineSupport;
import java.util.Arrays;
import java.util.List;
public class StoreBackupCommand implements Command {
private CommandContext context;
@Override
public void setCommandContext(CommandContext context) {
this.context = context;
}
@Override
public String getName() {
return "backup";
}
@Override
public String getOneLineDescription() {
return "Backup a message (or range) from a queue to an archive file";
}
@Override
public void execute(List<String> tokens) throws Exception {
StoreBackup backup = new StoreBackup();
String[] remaining = CommandLineSupport.setOptions(backup, tokens.toArray(new String[tokens.size()]));
if (remaining.length > 0) {
throw new Exception("Unexpected arguments: " + Arrays.asList(remaining));
}
backup.execute();
}
}

View File

@ -23,9 +23,6 @@ import org.apache.activemq.console.command.store.amq.CommandLineSupport;
import java.util.Arrays;
import java.util.List;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class StoreExportCommand implements Command {
private CommandContext context;

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store;
import org.apache.activemq.console.command.store.protobuf.*;
import org.apache.activemq.console.command.store.tar.TarEntry;
import org.apache.activemq.console.command.store.tar.TarOutputStream;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.MessageBuffer;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;
public class BackupStreamManager {
private final OutputStream target;
private final int version;
TarOutputStream stream;
BackupStreamManager(OutputStream target, int version) throws IOException {
this.target = target;
this.version = version;
stream = new TarOutputStream(new GZIPOutputStream(target));
store("ver", new AsciiBuffer(""+version));
}
long seq = 0;
public void finish() throws IOException {
stream.close();
}
private void store(String ext, Buffer value) throws IOException {
TarEntry entry = new TarEntry(seq + "." + ext);
seq += 1;
entry.setSize(value.getLength());
stream.putNextEntry(entry);
stream.write(value.getData());
stream.closeEntry();
}
private void store(String ext, MessageBuffer<?,?> value) throws IOException {
TarEntry entry = new TarEntry(seq + "." + ext);
seq += 1;
entry.setSize(value.serializedSizeFramed());
stream.putNextEntry(entry);
value.writeFramed(stream);
stream.closeEntry();
}
public void store_queue(QueuePB value) throws IOException {
store("que", value.toUnframedBuffer());
}
public void store_queue_entry(QueueEntryPB value) throws IOException {
store("qen", value.toUnframedBuffer());
}
public void store_message(MessagePB value) throws IOException {
store("msg", value.toUnframedBuffer());
}
public void store_map_entry(MapEntryPB value) throws IOException {
store("map", value.toUnframedBuffer());
}
}

View File

@ -0,0 +1,326 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command.store;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.console.command.store.protobuf.MessagePB;
import org.apache.activemq.console.command.store.protobuf.QueueEntryPB;
import org.apache.activemq.console.command.store.protobuf.QueuePB;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.UTF8Buffer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class StoreBackup {
static final int OPENWIRE_VERSION = 11;
static final boolean TIGHT_ENCODING = false;
URI config;
String filename;
File file;
String queue;
Integer offset;
Integer count;
private final ObjectMapper mapper = new ObjectMapper();
private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
private final AsciiBuffer codec_id = new AsciiBuffer("openwire");
private final OpenWireFormat wireformat = new OpenWireFormat();
public StoreBackup() throws URISyntaxException {
config = new URI("xbean:activemq.xml");
wireformat.setCacheEnabled(false);
wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
wireformat.setVersion(OPENWIRE_VERSION);
}
public void execute() throws Exception {
if (config == null) {
throw new Exception("required --config option missing");
}
if (filename == null) {
throw new Exception("required --filename option missing");
}
if (offset != null && count == null) {
throw new Exception("optional --offset and --count must be specified together");
}
setFile(new File(filename));
System.out.println("Loading: " + config);
BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting..
BrokerService broker = BrokerFactory.createBroker(config);
BrokerFactory.resetStartDefault();
PersistenceAdapter store = broker.getPersistenceAdapter();
System.out.println("Starting: " + store);
store.start();
try(BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file))) {
export(store, fos);
} finally {
store.stop();
}
}
void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
final long[] messageKeyCounter = new long[]{0};
final long[] containerKeyCounter = new long[]{0};
final BackupStreamManager manager = new BackupStreamManager(fos, 1);
final int[] preparedTxs = new int[]{0};
store.createTransactionStore().recover(new TransactionRecoveryListener() {
@Override
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
preparedTxs[0] += 1;
}
});
if (preparedTxs[0] > 0) {
throw new Exception("Cannot export a store with prepared XA transactions. Please commit or rollback those transactions before attempting to backup.");
}
for (ActiveMQDestination odest : store.getDestinations()) {
if(queue != null && !queue.equals(odest.getPhysicalName())) {
continue;
}
containerKeyCounter[0]++;
if (odest instanceof ActiveMQQueue) {
ActiveMQQueue dest = (ActiveMQQueue) odest;
MessageStore queue = store.createQueueMessageStore(dest);
QueuePB destRecord = new QueuePB();
destRecord.setKey(containerKeyCounter[0]);
destRecord.setBindingKind(ptp_kind);
final long[] seqKeyCounter = new long[]{0};
HashMap<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("@class", "queue_destination");
jsonMap.put("name", dest.getQueueName());
String json = mapper.writeValueAsString(jsonMap);
System.out.println(json);
destRecord.setBindingData(new UTF8Buffer(json));
manager.store_queue(destRecord);
MessageRecoveryListener queueRecoveryListener = new MessageRecoveryListener() {
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean recoverMessage(Message message) throws IOException {
messageKeyCounter[0]++;
seqKeyCounter[0]++;
MessagePB messageRecord = createMessagePB(message, messageKeyCounter[0]);
manager.store_message(messageRecord);
QueueEntryPB entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
manager.store_queue_entry(entryRecord);
return true;
}
};
if(offset != null) {
queue.recoverNextMessages(offset, count, queueRecoveryListener);
} else {
queue.recover(queueRecoveryListener);
}
} else if (odest instanceof ActiveMQTopic) {
ActiveMQTopic dest = (ActiveMQTopic) odest;
TopicMessageStore topic = store.createTopicMessageStore(dest);
for (SubscriptionInfo sub : topic.getAllSubscriptions()) {
QueuePB destRecord = new QueuePB();
destRecord.setKey(containerKeyCounter[0]);
destRecord.setBindingKind(ds_kind);
// TODO: use a real JSON encoder like jackson.
HashMap<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("@class", "dsub_destination");
jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName());
HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
jsonTopic.put("name", dest.getTopicName());
jsonMap.put("topics", new Object[]{jsonTopic});
if (sub.getSelector() != null) {
jsonMap.put("selector", sub.getSelector());
}
jsonMap.put("noLocal", sub.isNoLocal());
String json = mapper.writeValueAsString(jsonMap);
System.out.println(json);
destRecord.setBindingData(new UTF8Buffer(json));
manager.store_queue(destRecord);
final long seqKeyCounter[] = new long[]{0};
topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() {
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean recoverMessage(Message message) throws IOException {
messageKeyCounter[0]++;
seqKeyCounter[0]++;
MessagePB messageRecord = createMessagePB(message, messageKeyCounter[0]);
manager.store_message(messageRecord);
QueueEntryPB entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
manager.store_queue_entry(entryRecord);
return true;
}
});
}
}
}
manager.finish();
}
private QueueEntryPB createQueueEntryPB(Message message, long queueKey, long queueSeq, long messageKey) {
QueueEntryPB entryRecord = new QueueEntryPB();
entryRecord.setQueueKey(queueKey);
entryRecord.setQueueSeq(queueSeq);
entryRecord.setMessageKey(messageKey);
entryRecord.setSize(message.getSize());
if (message.getExpiration() != 0) {
entryRecord.setExpiration(message.getExpiration());
}
if (message.getRedeliveryCounter() != 0) {
entryRecord.setRedeliveries(message.getRedeliveryCounter());
}
return entryRecord;
}
private MessagePB createMessagePB(Message message, long messageKey) throws IOException {
DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
mos.writeBoolean(TIGHT_ENCODING);
mos.writeVarInt(OPENWIRE_VERSION);
wireformat.marshal(message, mos);
MessagePB messageRecord = new MessagePB();
messageRecord.setCodec(codec_id);
messageRecord.setMessageKey(messageKey);
messageRecord.setSize(message.getSize());
messageRecord.setValue(new Buffer(mos.toBuffer().getData()));
return messageRecord;
}
public File getFile() {
return file;
}
public void setFilename(String filename) {
this.filename = filename;
}
public String getFilename() {
return filename;
}
public void setFile(File file) {
this.file = file;
}
public URI getConfig() {
return config;
}
public void setConfig(URI config) {
this.config = config;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getQueue() {
return queue;
}
public void setOffset(int offset) {
this.offset = offset;
}
public Integer getOffset() {
return offset;
}
public void setCount(int count) {
this.count = count;
}
public Integer getCount() {
return count;
}
}

View File

@ -0,0 +1,60 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.apache.activemq.console.command.store.protobuf;
option java_multiple_files = true;
///////////////////////////////////////////////////////////////
// Message related operations.
///////////////////////////////////////////////////////////////
message MessagePB {
required int64 messageKey=1;
required bytes codec = 2 [java_override_type = "AsciiBuffer"];
optional int32 size = 3;
optional bytes value = 4;
optional sint64 expiration = 5;
optional int32 compression = 6;
optional bytes direct_data = 10;
optional bytes direct_file = 12;
optional int64 direct_offset = 13;
optional int32 direct_size = 14;
}
message QueuePB {
required int64 key=1;
optional bytes binding_kind = 2 [java_override_type = "AsciiBuffer"];
optional bytes binding_data = 3;
}
message QueueEntryPB {
required int64 queueKey=1;
required int64 queueSeq=2;
required int64 messageKey=3;
optional int32 size=4;
optional bytes attachment=5;
optional int32 redeliveries = 6;
optional sint64 expiration=7;
optional bytes messageLocator=8;
repeated bytes sender=9;
}
message MapEntryPB {
required bytes key = 1;
optional bytes value = 2;
}

View File

@ -28,3 +28,4 @@ org.apache.activemq.console.command.StoreExportCommand
org.apache.activemq.console.command.PurgeCommand
org.apache.activemq.console.command.ProducerCommand
org.apache.activemq.console.command.ConsumerCommand
org.apache.activemq.console.command.StoreBackupCommand