mirror of https://github.com/apache/activemq.git
Refactor out common logic to keep things dry.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1405218 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ba0101882a
commit
dee9e93266
|
@ -43,11 +43,23 @@ import java.util.HashMap;
|
||||||
*/
|
*/
|
||||||
public class StoreExporter {
|
public class StoreExporter {
|
||||||
|
|
||||||
|
static final int OPENWIRE_VERSION = 8;
|
||||||
|
static final boolean TIGHT_ENCODING = false;
|
||||||
|
|
||||||
URI config;
|
URI config;
|
||||||
File file;
|
File file;
|
||||||
|
|
||||||
|
private ObjectMapper mapper = new ObjectMapper();
|
||||||
|
private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
|
||||||
|
private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
|
||||||
|
private final AsciiBuffer codec_id = new AsciiBuffer("openwire");
|
||||||
|
private final OpenWireFormat wireformat = new OpenWireFormat();
|
||||||
|
|
||||||
public StoreExporter() throws URISyntaxException {
|
public StoreExporter() throws URISyntaxException {
|
||||||
config = new URI("xbean:activemq.xml");
|
config = new URI("xbean:activemq.xml");
|
||||||
|
wireformat.setCacheEnabled(false);
|
||||||
|
wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
|
||||||
|
wireformat.setVersion(OPENWIRE_VERSION);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute() throws Exception {
|
public void execute() throws Exception {
|
||||||
|
@ -76,19 +88,8 @@ public class StoreExporter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static final int OPENWIRE_VERSION = 8;
|
|
||||||
static final boolean TIGHT_ENCODING = false;
|
|
||||||
|
|
||||||
void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
|
void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
|
||||||
|
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
|
||||||
final AsciiBuffer ds_kind = new AsciiBuffer("ds");
|
|
||||||
final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
|
|
||||||
final AsciiBuffer codec_id = new AsciiBuffer("openwire");
|
|
||||||
final OpenWireFormat wireformat = new OpenWireFormat();
|
|
||||||
wireformat.setCacheEnabled(false);
|
|
||||||
wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
|
|
||||||
wireformat.setVersion(OPENWIRE_VERSION);
|
|
||||||
|
|
||||||
final long[] messageKeyCounter = new long[]{0};
|
final long[] messageKeyCounter = new long[]{0};
|
||||||
final long[] containerKeyCounter = new long[]{0};
|
final long[] containerKeyCounter = new long[]{0};
|
||||||
|
@ -143,31 +144,12 @@ public class StoreExporter {
|
||||||
messageKeyCounter[0]++;
|
messageKeyCounter[0]++;
|
||||||
seqKeyCounter[0]++;
|
seqKeyCounter[0]++;
|
||||||
|
|
||||||
DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
|
MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
|
||||||
mos.writeBoolean(TIGHT_ENCODING);
|
|
||||||
mos.writeVarInt(OPENWIRE_VERSION);
|
|
||||||
wireformat.marshal(message, mos);
|
|
||||||
|
|
||||||
MessagePB.Bean messageRecord = new MessagePB.Bean();
|
|
||||||
messageRecord.setCodec(codec_id);
|
|
||||||
messageRecord.setMessageKey(messageKeyCounter[0]);
|
|
||||||
messageRecord.setSize(message.getSize());
|
|
||||||
messageRecord.setValue(mos.toBuffer());
|
|
||||||
// record.setCompression()
|
|
||||||
manager.store_message(messageRecord);
|
manager.store_message(messageRecord);
|
||||||
|
|
||||||
QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
|
QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
|
||||||
entryRecord.setQueueKey(containerKeyCounter[0]);
|
|
||||||
entryRecord.setQueueSeq(seqKeyCounter[0]);
|
|
||||||
entryRecord.setMessageKey(messageKeyCounter[0]);
|
|
||||||
entryRecord.setSize(message.getSize());
|
|
||||||
if (message.getExpiration() != 0) {
|
|
||||||
entryRecord.setExpiration(message.getExpiration());
|
|
||||||
}
|
|
||||||
if (message.getRedeliveryCounter() != 0) {
|
|
||||||
entryRecord.setRedeliveries(message.getRedeliveryCounter());
|
|
||||||
}
|
|
||||||
manager.store_queue_entry(entryRecord);
|
manager.store_queue_entry(entryRecord);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -216,30 +198,10 @@ public class StoreExporter {
|
||||||
messageKeyCounter[0]++;
|
messageKeyCounter[0]++;
|
||||||
seqKeyCounter[0]++;
|
seqKeyCounter[0]++;
|
||||||
|
|
||||||
DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
|
MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
|
||||||
mos.writeBoolean(TIGHT_ENCODING);
|
|
||||||
mos.writeVarInt(OPENWIRE_VERSION);
|
|
||||||
wireformat.marshal(mos);
|
|
||||||
|
|
||||||
MessagePB.Bean messageRecord = new MessagePB.Bean();
|
|
||||||
messageRecord.setCodec(codec_id);
|
|
||||||
messageRecord.setMessageKey(messageKeyCounter[0]);
|
|
||||||
messageRecord.setSize(message.getSize());
|
|
||||||
messageRecord.setValue(mos.toBuffer());
|
|
||||||
// record.setCompression()
|
|
||||||
manager.store_message(messageRecord);
|
manager.store_message(messageRecord);
|
||||||
|
|
||||||
QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
|
QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
|
||||||
entryRecord.setQueueKey(containerKeyCounter[0]);
|
|
||||||
entryRecord.setQueueSeq(seqKeyCounter[0]);
|
|
||||||
entryRecord.setMessageKey(messageKeyCounter[0]);
|
|
||||||
entryRecord.setSize(message.getSize());
|
|
||||||
if (message.getExpiration() != 0) {
|
|
||||||
entryRecord.setExpiration(message.getExpiration());
|
|
||||||
}
|
|
||||||
if (message.getRedeliveryCounter() != 0) {
|
|
||||||
entryRecord.setRedeliveries(message.getRedeliveryCounter());
|
|
||||||
}
|
|
||||||
manager.store_queue_entry(entryRecord);
|
manager.store_queue_entry(entryRecord);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -251,6 +213,35 @@ public class StoreExporter {
|
||||||
manager.finish();
|
manager.finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private QueueEntryPB.Bean createQueueEntryPB(Message message, long queueKey, long queueSeq, long messageKey) {
|
||||||
|
QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
|
||||||
|
entryRecord.setQueueKey(queueKey);
|
||||||
|
entryRecord.setQueueSeq(queueSeq);
|
||||||
|
entryRecord.setMessageKey(messageKey);
|
||||||
|
entryRecord.setSize(message.getSize());
|
||||||
|
if (message.getExpiration() != 0) {
|
||||||
|
entryRecord.setExpiration(message.getExpiration());
|
||||||
|
}
|
||||||
|
if (message.getRedeliveryCounter() != 0) {
|
||||||
|
entryRecord.setRedeliveries(message.getRedeliveryCounter());
|
||||||
|
}
|
||||||
|
return entryRecord;
|
||||||
|
}
|
||||||
|
|
||||||
|
private MessagePB.Bean createMessagePB(Message message, long messageKey) throws IOException {
|
||||||
|
DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
|
||||||
|
mos.writeBoolean(TIGHT_ENCODING);
|
||||||
|
mos.writeVarInt(OPENWIRE_VERSION);
|
||||||
|
wireformat.marshal(message, mos);
|
||||||
|
|
||||||
|
MessagePB.Bean messageRecord = new MessagePB.Bean();
|
||||||
|
messageRecord.setCodec(codec_id);
|
||||||
|
messageRecord.setMessageKey(messageKey);
|
||||||
|
messageRecord.setSize(message.getSize());
|
||||||
|
messageRecord.setValue(mos.toBuffer());
|
||||||
|
return messageRecord;
|
||||||
|
}
|
||||||
|
|
||||||
public File getFile() {
|
public File getFile() {
|
||||||
return file;
|
return file;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue