merge #115 - doc changes on architecture including picture changes
This commit is contained in:
commit
41b2232907
|
@ -14,7 +14,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.core.journal.impl;
|
package org.apache.activemq.tools;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -24,18 +24,20 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.activemq.core.journal.RecordInfo;
|
import org.apache.activemq.core.journal.RecordInfo;
|
||||||
import org.apache.activemq.core.journal.SequentialFileFactory;
|
import org.apache.activemq.core.journal.SequentialFileFactory;
|
||||||
|
import org.apache.activemq.core.journal.impl.JournalFile;
|
||||||
|
import org.apache.activemq.core.journal.impl.JournalImpl;
|
||||||
|
import org.apache.activemq.core.journal.impl.JournalReaderCallback;
|
||||||
|
import org.apache.activemq.core.journal.impl.NIOSequentialFileFactory;
|
||||||
import org.apache.activemq.utils.Base64;
|
import org.apache.activemq.utils.Base64;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use this class to export the journal data. You can use it as a main class or through its native method {@link ExportJournal#exportJournal(String, String, String, int, int, String)}
|
* Use this class to export the journal data. You can use it as a main class or through its static method {@link #exportJournal(String, String, String, int, int, String)}
|
||||||
*
|
* <p/>
|
||||||
* If you use the main method, use it as <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>
|
* If you use the main method, use it as <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>
|
||||||
*
|
* <p/>
|
||||||
* Example: java -cp activemq-core.jar org.apache.activemq.core.journal.impl.ExportJournal /journalDir activemq-data amq 2 10485760 /tmp/export.dat
|
* Example: java -cp activemq-tools*-jar-with-dependencies.jar export-journal /journalDir activemq-data amq 2 10485760 /tmp/export.dat
|
||||||
*
|
*
|
||||||
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
|
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public class ExportJournal
|
public class ExportJournal
|
||||||
{
|
{
|
||||||
|
@ -50,17 +52,21 @@ public class ExportJournal
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
|
||||||
public static void main(final String[] arg)
|
public void process(final String[] arg)
|
||||||
{
|
{
|
||||||
if (arg.length != 5)
|
if (arg.length != 6)
|
||||||
{
|
{
|
||||||
System.err.println("Use: java -cp activemq-core.jar org.apache.activemq.core.journal.impl.ExportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
|
for (int i = 0; i < arg.length; i++)
|
||||||
return;
|
{
|
||||||
|
System.out.println("arg[" + i + "] = " + arg[i]);
|
||||||
|
}
|
||||||
|
printUsage();
|
||||||
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
|
exportJournal(arg[1], arg[2], arg[3], 2, Integer.parseInt(arg[4]), arg[5]);
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
|
@ -83,7 +89,7 @@ public class ExportJournal
|
||||||
|
|
||||||
PrintStream out = new PrintStream(buffOut);
|
PrintStream out = new PrintStream(buffOut);
|
||||||
|
|
||||||
ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
|
exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
|
||||||
|
|
||||||
out.close();
|
out.close();
|
||||||
}
|
}
|
||||||
|
@ -105,7 +111,7 @@ public class ExportJournal
|
||||||
{
|
{
|
||||||
out.println("#File," + file);
|
out.println("#File," + file);
|
||||||
|
|
||||||
ExportJournal.exportJournalFile(out, nio, file);
|
exportJournalFile(out, nio, file);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,12 +130,12 @@ public class ExportJournal
|
||||||
|
|
||||||
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
|
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
|
||||||
{
|
{
|
||||||
out.println("operation@UpdateTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
|
out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
|
public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
|
||||||
{
|
{
|
||||||
out.println("operation@Update," + ExportJournal.describeRecord(recordInfo));
|
out.println("operation@Update," + describeRecord(recordInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onReadRollbackRecord(final long transactionID) throws Exception
|
public void onReadRollbackRecord(final long transactionID) throws Exception
|
||||||
|
@ -140,17 +146,17 @@ public class ExportJournal
|
||||||
public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
|
public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
|
||||||
{
|
{
|
||||||
out.println("operation@Prepare,txID@" + transactionID +
|
out.println("operation@Prepare,txID@" + transactionID +
|
||||||
",numberOfRecords@" +
|
",numberOfRecords@" +
|
||||||
numberOfRecords +
|
numberOfRecords +
|
||||||
",extraData@" +
|
",extraData@" +
|
||||||
ExportJournal.encode(extraData));
|
encode(extraData));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
|
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
|
||||||
{
|
{
|
||||||
out.println("operation@DeleteRecordTX,txID@" + transactionID +
|
out.println("operation@DeleteRecordTX,txID@" + transactionID +
|
||||||
"," +
|
"," +
|
||||||
ExportJournal.describeRecord(recordInfo));
|
describeRecord(recordInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onReadDeleteRecord(final long recordID) throws Exception
|
public void onReadDeleteRecord(final long recordID) throws Exception
|
||||||
|
@ -165,12 +171,12 @@ public class ExportJournal
|
||||||
|
|
||||||
public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
|
public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
|
||||||
{
|
{
|
||||||
out.println("operation@AddRecordTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
|
out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
|
public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
|
||||||
{
|
{
|
||||||
out.println("operation@AddRecord," + ExportJournal.describeRecord(recordInfo));
|
out.println("operation@AddRecord," + describeRecord(recordInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void markAsDataFile(final JournalFile file)
|
public void markAsDataFile(final JournalFile file)
|
||||||
|
@ -182,16 +188,16 @@ public class ExportJournal
|
||||||
private static String describeRecord(final RecordInfo recordInfo)
|
private static String describeRecord(final RecordInfo recordInfo)
|
||||||
{
|
{
|
||||||
return "id@" + recordInfo.id +
|
return "id@" + recordInfo.id +
|
||||||
",userRecordType@" +
|
",userRecordType@" +
|
||||||
recordInfo.userRecordType +
|
recordInfo.userRecordType +
|
||||||
",length@" +
|
",length@" +
|
||||||
recordInfo.data.length +
|
recordInfo.data.length +
|
||||||
",isUpdate@" +
|
",isUpdate@" +
|
||||||
recordInfo.isUpdate +
|
recordInfo.isUpdate +
|
||||||
",compactCount@" +
|
",compactCount@" +
|
||||||
recordInfo.compactCount +
|
recordInfo.compactCount +
|
||||||
",data@" +
|
",data@" +
|
||||||
ExportJournal.encode(recordInfo.data);
|
encode(recordInfo.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String encode(final byte[] data)
|
private static String encode(final byte[] data)
|
||||||
|
@ -199,12 +205,22 @@ public class ExportJournal
|
||||||
return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
|
return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Package protected ---------------------------------------------
|
|
||||||
|
|
||||||
// Protected -----------------------------------------------------
|
public void printUsage()
|
||||||
|
{
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
{
|
||||||
|
System.err.println();
|
||||||
|
}
|
||||||
|
System.err.println("This method will export the journal at low level record.");
|
||||||
|
System.err.println();
|
||||||
|
System.err.println(Main.USAGE + " export-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
|
||||||
|
System.err.println();
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
{
|
||||||
|
System.err.println();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Private -------------------------------------------------------
|
|
||||||
|
|
||||||
// Inner classes -------------------------------------------------
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -14,7 +14,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.core.journal.impl;
|
package org.apache.activemq.tools;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -28,12 +28,15 @@ import java.util.Properties;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.activemq.core.journal.RecordInfo;
|
import org.apache.activemq.core.journal.RecordInfo;
|
||||||
|
import org.apache.activemq.core.journal.impl.JournalImpl;
|
||||||
|
import org.apache.activemq.core.journal.impl.JournalRecord;
|
||||||
|
import org.apache.activemq.core.journal.impl.NIOSequentialFileFactory;
|
||||||
import org.apache.activemq.utils.Base64;
|
import org.apache.activemq.utils.Base64;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use this class to import the journal data from a listed file. You can use it as a main class or
|
* Use this class to import the journal data from a listed file. You can use it as a main class or
|
||||||
* through its native method
|
* through its native method
|
||||||
* {@link ImportJournal#importJournal(String, String, String, int, int, String)}
|
* {@link #importJournal(String, String, String, int, int, String)}
|
||||||
* <p>
|
* <p>
|
||||||
* If you use the main method, use its arguments as:
|
* If you use the main method, use its arguments as:
|
||||||
*
|
*
|
||||||
|
@ -61,17 +64,26 @@ public class ImportJournal
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
|
||||||
public static void main(final String[] arg)
|
|
||||||
|
public void process(final String[] arg)
|
||||||
{
|
{
|
||||||
if (arg.length != 5)
|
for (int i = 0; i < arg.length; i++)
|
||||||
{
|
{
|
||||||
System.err.println("Use: java -cp activemq-core.jar:netty.jar org.apache.activemq.core.journal.impl.ImportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
|
System.out.println("arg[" + i + "] = " + arg[i]);
|
||||||
return;
|
}
|
||||||
|
if (arg.length != 6)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < arg.length; i++)
|
||||||
|
{
|
||||||
|
System.out.println("arg[" + i + "] = " + arg[i]);
|
||||||
|
}
|
||||||
|
printUsage();
|
||||||
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
|
importJournal(arg[1], arg[2], arg[3], 2, Integer.parseInt(arg[4]), arg[5]);
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
|
@ -88,7 +100,7 @@ public class ImportJournal
|
||||||
final String fileInput) throws Exception
|
final String fileInput) throws Exception
|
||||||
{
|
{
|
||||||
FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
|
FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
|
||||||
ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
|
importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,7 +112,7 @@ public class ImportJournal
|
||||||
final InputStream stream) throws Exception
|
final InputStream stream) throws Exception
|
||||||
{
|
{
|
||||||
Reader reader = new InputStreamReader(stream);
|
Reader reader = new InputStreamReader(stream);
|
||||||
ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
|
importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void importJournal(final String directory,
|
public static void importJournal(final String directory,
|
||||||
|
@ -153,7 +165,7 @@ public class ImportJournal
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
Properties lineProperties = ImportJournal.parseLine(splitLine);
|
Properties lineProperties = parseLine(splitLine);
|
||||||
|
|
||||||
String operation = null;
|
String operation = null;
|
||||||
try
|
try
|
||||||
|
@ -162,41 +174,41 @@ public class ImportJournal
|
||||||
|
|
||||||
if (operation.equals("AddRecord"))
|
if (operation.equals("AddRecord"))
|
||||||
{
|
{
|
||||||
RecordInfo info = ImportJournal.parseRecord(lineProperties);
|
RecordInfo info = parseRecord(lineProperties);
|
||||||
journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
|
journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
|
||||||
}
|
}
|
||||||
else if (operation.equals("AddRecordTX"))
|
else if (operation.equals("AddRecordTX"))
|
||||||
{
|
{
|
||||||
long txID = ImportJournal.parseLong("txID", lineProperties);
|
long txID = parseLong("txID", lineProperties);
|
||||||
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
|
AtomicInteger counter = getCounter(txID, txCounters);
|
||||||
counter.incrementAndGet();
|
counter.incrementAndGet();
|
||||||
RecordInfo info = ImportJournal.parseRecord(lineProperties);
|
RecordInfo info = parseRecord(lineProperties);
|
||||||
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
|
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
|
||||||
}
|
}
|
||||||
else if (operation.equals("AddRecordTX"))
|
else if (operation.equals("AddRecordTX"))
|
||||||
{
|
{
|
||||||
long txID = ImportJournal.parseLong("txID", lineProperties);
|
long txID = parseLong("txID", lineProperties);
|
||||||
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
|
AtomicInteger counter = getCounter(txID, txCounters);
|
||||||
counter.incrementAndGet();
|
counter.incrementAndGet();
|
||||||
RecordInfo info = ImportJournal.parseRecord(lineProperties);
|
RecordInfo info = parseRecord(lineProperties);
|
||||||
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
|
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
|
||||||
}
|
}
|
||||||
else if (operation.equals("UpdateTX"))
|
else if (operation.equals("UpdateTX"))
|
||||||
{
|
{
|
||||||
long txID = ImportJournal.parseLong("txID", lineProperties);
|
long txID = parseLong("txID", lineProperties);
|
||||||
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
|
AtomicInteger counter = getCounter(txID, txCounters);
|
||||||
counter.incrementAndGet();
|
counter.incrementAndGet();
|
||||||
RecordInfo info = ImportJournal.parseRecord(lineProperties);
|
RecordInfo info = parseRecord(lineProperties);
|
||||||
journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
|
journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
|
||||||
}
|
}
|
||||||
else if (operation.equals("Update"))
|
else if (operation.equals("Update"))
|
||||||
{
|
{
|
||||||
RecordInfo info = ImportJournal.parseRecord(lineProperties);
|
RecordInfo info = parseRecord(lineProperties);
|
||||||
journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
|
journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
|
||||||
}
|
}
|
||||||
else if (operation.equals("DeleteRecord"))
|
else if (operation.equals("DeleteRecord"))
|
||||||
{
|
{
|
||||||
long id = ImportJournal.parseLong("id", lineProperties);
|
long id = parseLong("id", lineProperties);
|
||||||
|
|
||||||
// If not found it means the append/update records were reclaimed already
|
// If not found it means the append/update records were reclaimed already
|
||||||
if (journalRecords.get(id) != null)
|
if (journalRecords.get(id) != null)
|
||||||
|
@ -206,9 +218,9 @@ public class ImportJournal
|
||||||
}
|
}
|
||||||
else if (operation.equals("DeleteRecordTX"))
|
else if (operation.equals("DeleteRecordTX"))
|
||||||
{
|
{
|
||||||
long txID = ImportJournal.parseLong("txID", lineProperties);
|
long txID = parseLong("txID", lineProperties);
|
||||||
long id = ImportJournal.parseLong("id", lineProperties);
|
long id = parseLong("id", lineProperties);
|
||||||
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
|
AtomicInteger counter = getCounter(txID, txCounters);
|
||||||
counter.incrementAndGet();
|
counter.incrementAndGet();
|
||||||
|
|
||||||
// If not found it means the append/update records were reclaimed already
|
// If not found it means the append/update records were reclaimed already
|
||||||
|
@ -219,10 +231,10 @@ public class ImportJournal
|
||||||
}
|
}
|
||||||
else if (operation.equals("Prepare"))
|
else if (operation.equals("Prepare"))
|
||||||
{
|
{
|
||||||
long txID = ImportJournal.parseLong("txID", lineProperties);
|
long txID = parseLong("txID", lineProperties);
|
||||||
int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
|
int numberOfRecords = parseInt("numberOfRecords", lineProperties);
|
||||||
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
|
AtomicInteger counter = getCounter(txID, txCounters);
|
||||||
byte[] data = ImportJournal.parseEncoding("extraData", lineProperties);
|
byte[] data = parseEncoding("extraData", lineProperties);
|
||||||
|
|
||||||
if (counter.get() == numberOfRecords)
|
if (counter.get() == numberOfRecords)
|
||||||
{
|
{
|
||||||
|
@ -241,9 +253,9 @@ public class ImportJournal
|
||||||
}
|
}
|
||||||
else if (operation.equals("Commit"))
|
else if (operation.equals("Commit"))
|
||||||
{
|
{
|
||||||
long txID = ImportJournal.parseLong("txID", lineProperties);
|
long txID = parseLong("txID", lineProperties);
|
||||||
int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
|
int numberOfRecords = parseInt("numberOfRecords", lineProperties);
|
||||||
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
|
AtomicInteger counter = getCounter(txID, txCounters);
|
||||||
if (counter.get() == numberOfRecords)
|
if (counter.get() == numberOfRecords)
|
||||||
{
|
{
|
||||||
journal.appendCommitRecord(txID, false);
|
journal.appendCommitRecord(txID, false);
|
||||||
|
@ -261,12 +273,12 @@ public class ImportJournal
|
||||||
}
|
}
|
||||||
else if (operation.equals("Rollback"))
|
else if (operation.equals("Rollback"))
|
||||||
{
|
{
|
||||||
long txID = ImportJournal.parseLong("txID", lineProperties);
|
long txID = parseLong("txID", lineProperties);
|
||||||
journal.appendRollbackRecord(txID, false);
|
journal.appendRollbackRecord(txID, false);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
System.err.println("Invalid opeartion " + operation + " at line " + lineNumber);
|
System.err.println("Invalid operation " + operation + " at line " + lineNumber);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
|
@ -293,18 +305,18 @@ public class ImportJournal
|
||||||
|
|
||||||
protected static RecordInfo parseRecord(final Properties properties) throws Exception
|
protected static RecordInfo parseRecord(final Properties properties) throws Exception
|
||||||
{
|
{
|
||||||
long id = ImportJournal.parseLong("id", properties);
|
long id = parseLong("id", properties);
|
||||||
byte userRecordType = ImportJournal.parseByte("userRecordType", properties);
|
byte userRecordType = parseByte("userRecordType", properties);
|
||||||
boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties);
|
boolean isUpdate = parseBoolean("isUpdate", properties);
|
||||||
byte[] data = ImportJournal.parseEncoding("data", properties);
|
byte[] data = parseEncoding("data", properties);
|
||||||
return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
|
return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] parseEncoding(final String name, final Properties properties) throws Exception
|
private static byte[] parseEncoding(final String name, final Properties properties) throws Exception
|
||||||
{
|
{
|
||||||
String value = ImportJournal.parseString(name, properties);
|
String value = parseString(name, properties);
|
||||||
|
|
||||||
return ImportJournal.decode(value);
|
return decode(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -313,28 +325,28 @@ public class ImportJournal
|
||||||
*/
|
*/
|
||||||
private static int parseInt(final String name, final Properties properties) throws Exception
|
private static int parseInt(final String name, final Properties properties) throws Exception
|
||||||
{
|
{
|
||||||
String value = ImportJournal.parseString(name, properties);
|
String value = parseString(name, properties);
|
||||||
|
|
||||||
return Integer.parseInt(value);
|
return Integer.parseInt(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long parseLong(final String name, final Properties properties) throws Exception
|
private static long parseLong(final String name, final Properties properties) throws Exception
|
||||||
{
|
{
|
||||||
String value = ImportJournal.parseString(name, properties);
|
String value = parseString(name, properties);
|
||||||
|
|
||||||
return Long.parseLong(value);
|
return Long.parseLong(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean parseBoolean(final String name, final Properties properties) throws Exception
|
private static boolean parseBoolean(final String name, final Properties properties) throws Exception
|
||||||
{
|
{
|
||||||
String value = ImportJournal.parseString(name, properties);
|
String value = parseString(name, properties);
|
||||||
|
|
||||||
return Boolean.parseBoolean(value);
|
return Boolean.parseBoolean(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte parseByte(final String name, final Properties properties) throws Exception
|
private static byte parseByte(final String name, final Properties properties) throws Exception
|
||||||
{
|
{
|
||||||
String value = ImportJournal.parseString(name, properties);
|
String value = parseString(name, properties);
|
||||||
|
|
||||||
return Byte.parseByte(value);
|
return Byte.parseByte(value);
|
||||||
}
|
}
|
||||||
|
@ -381,12 +393,21 @@ public class ImportJournal
|
||||||
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
|
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Package protected ---------------------------------------------
|
|
||||||
|
|
||||||
// Protected -----------------------------------------------------
|
public void printUsage()
|
||||||
|
{
|
||||||
// Private -------------------------------------------------------
|
for (int i = 0; i < 10; i++)
|
||||||
|
{
|
||||||
// Inner classes -------------------------------------------------
|
System.err.println();
|
||||||
|
}
|
||||||
|
System.err.println("This method will export the journal at low level record.");
|
||||||
|
System.err.println();
|
||||||
|
System.err.println(Main.USAGE + " import-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileInput>");
|
||||||
|
System.err.println();
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
{
|
||||||
|
System.err.println();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -25,7 +25,9 @@ public class Main
|
||||||
private static final String PRINT_PAGES = "print-pages";
|
private static final String PRINT_PAGES = "print-pages";
|
||||||
private static final String DATA_TOOL = "data-tool";
|
private static final String DATA_TOOL = "data-tool";
|
||||||
private static final String TRANSFER = "transfer-queue";
|
private static final String TRANSFER = "transfer-queue";
|
||||||
private static final String OPTIONS = " [" + IMPORT + "|" + EXPORT + "|" + PRINT_DATA + "|" + PRINT_PAGES + "|" + DATA_TOOL + "|" + TRANSFER + "]";
|
private static final String EXPORT_JOURNAL = "export-journal";
|
||||||
|
private static final String IMPORT_JOURNAL = "import-journal";
|
||||||
|
private static final String OPTIONS = " [" + IMPORT + "|" + EXPORT + "|" + PRINT_DATA + "|" + PRINT_PAGES + "|" + DATA_TOOL + "|" + TRANSFER + "|" + EXPORT_JOURNAL + "|" + IMPORT_JOURNAL + "]";
|
||||||
|
|
||||||
public static void main(String[] arg) throws Exception
|
public static void main(String[] arg) throws Exception
|
||||||
{
|
{
|
||||||
|
@ -36,7 +38,17 @@ public class Main
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (TRANSFER.equals(arg[0]))
|
if (IMPORT_JOURNAL.equals(arg[0]))
|
||||||
|
{
|
||||||
|
ImportJournal tool = new ImportJournal();
|
||||||
|
tool.process(arg);
|
||||||
|
}
|
||||||
|
else if (EXPORT_JOURNAL.equals(arg[0]))
|
||||||
|
{
|
||||||
|
ExportJournal tool = new ExportJournal();
|
||||||
|
tool.process(arg);
|
||||||
|
}
|
||||||
|
else if (TRANSFER.equals(arg[0]))
|
||||||
{
|
{
|
||||||
TransferQueue tool = new TransferQueue();
|
TransferQueue tool = new TransferQueue();
|
||||||
tool.process(arg);
|
tool.process(arg);
|
||||||
|
|
|
@ -65,8 +65,8 @@ under the License.
|
||||||
|
|
||||||
<release>
|
<release>
|
||||||
<Version>
|
<Version>
|
||||||
<name>2.4.0</name>
|
<name>6.0.0</name>
|
||||||
<revision>2.4.0.final</revision>
|
<revision>6.0.0.final</revision>
|
||||||
<created>2013-16-12</created>
|
<created>2013-16-12</created>
|
||||||
</Version>
|
</Version>
|
||||||
</release>
|
</release>
|
|
@ -28,14 +28,21 @@ ActiveMQ clients, potentially on different physical machines interact
|
||||||
with the ActiveMQ server. ActiveMQ currently provides two APIs for
|
with the ActiveMQ server. ActiveMQ currently provides two APIs for
|
||||||
messaging at the client side:
|
messaging at the client side:
|
||||||
|
|
||||||
1. Core client API. This is a simple intuitive Java API that allows the
|
1. Core client API. This is a simple intuitive Java API that allows the
|
||||||
full set of messaging functionality without some of the complexities
|
full set of messaging functionality without some of the complexities
|
||||||
of JMS.
|
of JMS.
|
||||||
|
|
||||||
2. JMS client API. The standard JMS API is available at the client
|
2. JMS client API. The standard JMS API is available at the client
|
||||||
side.
|
side.
|
||||||
|
|
||||||
JMS semantics are implemented by a thin JMS facade layer on the client
|
ActiveMQ also provides different protocol implementations on the server so you can use respective clients for these protocols:
|
||||||
|
|
||||||
|
1. Stomp
|
||||||
|
2. OpenWire
|
||||||
|
3. AMQP
|
||||||
|
|
||||||
|
|
||||||
|
JMS semantics are implemented by a JMS facade layer on the client
|
||||||
side.
|
side.
|
||||||
|
|
||||||
The ActiveMQ server does not speak JMS and in fact does not know
|
The ActiveMQ server does not speak JMS and in fact does not know
|
||||||
|
@ -66,7 +73,7 @@ application that requires messaging functionality internally but you
|
||||||
don't want to expose that as a ActiveMQ server you can directly
|
don't want to expose that as a ActiveMQ server you can directly
|
||||||
instantiate and embed ActiveMQ servers in your own application.
|
instantiate and embed ActiveMQ servers in your own application.
|
||||||
|
|
||||||
For more information on embedding ActiveMQ, see [Embedding HornetQ](embedding-hornetq.md).
|
For more information on embedding ActiveMQ, see [Embedding ActiveMQ](embedding-activemq.md).
|
||||||
|
|
||||||
## ActiveMQ integrated with a JEE application server
|
## ActiveMQ integrated with a JEE application server
|
||||||
|
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Before Width: | Height: | Size: 20 KiB After Width: | Height: | Size: 37 KiB |
Binary file not shown.
Before Width: | Height: | Size: 18 KiB After Width: | Height: | Size: 15 KiB |
Binary file not shown.
Before Width: | Height: | Size: 12 KiB After Width: | Height: | Size: 9.4 KiB |
|
@ -170,7 +170,7 @@ Let's take a brief look at these:
|
||||||
### Java Message Service (JMS)
|
### Java Message Service (JMS)
|
||||||
|
|
||||||
[JMS](http://en.wikipedia.org/wiki/Java_Message_Service) is part of
|
[JMS](http://en.wikipedia.org/wiki/Java_Message_Service) is part of
|
||||||
Sun's JEE specification. It's a Java API that encapsulates both message
|
Oracle's JEE specification. It's a Java API that encapsulates both message
|
||||||
queue and publish-subscribe messaging patterns. JMS is a lowest common
|
queue and publish-subscribe messaging patterns. JMS is a lowest common
|
||||||
denominator specification - i.e. it was created to encapsulate common
|
denominator specification - i.e. it was created to encapsulate common
|
||||||
functionality of the already existing messaging systems that were
|
functionality of the already existing messaging systems that were
|
||||||
|
|
|
@ -354,31 +354,24 @@ message data, duplicate id caches or paging data will be persisted.
|
||||||
|
|
||||||
You may want to inspect the existent records on each one of the journals
|
You may want to inspect the existent records on each one of the journals
|
||||||
used by ActiveMQ, and you can use the export/import tool for that
|
used by ActiveMQ, and you can use the export/import tool for that
|
||||||
purpose. The export/import are classes located at the activemq-core.jar,
|
purpose.
|
||||||
you can export the journal as a text file by using this command:
|
you can export the journal as a text file by using this command:
|
||||||
|
|
||||||
`java -cp activemq-core.jar org.apache.activemq.core.journal.impl.ExportJournal
|
`java -cp activemq-tools-jar-with-dependencies.jar export-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>`
|
||||||
<JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>
|
|
||||||
<FileOutput>`
|
|
||||||
|
|
||||||
To import the file as binary data on the journal (Notice you also
|
To import the file as binary data on the journal (Notice you also
|
||||||
require netty.jar):
|
require netty.jar):
|
||||||
|
|
||||||
`java -cp activemq-core.jar:netty.jar org.apache.activemq.core.journal.impl.ImportJournal
|
`java -cp activemq-tools-jar-with-dependencies.jar import-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileInput>`
|
||||||
<JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>
|
|
||||||
<FileInput>`
|
|
||||||
|
|
||||||
- JournalDirectory: Use the configured folder for your selected
|
- JournalDirectory: Use the configured folder for your selected folder. Example: ./activemq/data/journal
|
||||||
folder. Example: ./activemq/data/journal
|
|
||||||
|
|
||||||
- JournalPrefix: Use the prefix for your selected journal, as
|
- JournalPrefix: Use the prefix for your selected journal, as discussed above
|
||||||
discussed above
|
|
||||||
|
|
||||||
- FileExtension: Use the extension for your selected journal, as
|
- FileExtension: Use the extension for your selected journal, as discussed above
|
||||||
discussed above
|
|
||||||
|
|
||||||
- FileSize: Use the size for your selected journal, as discussed above
|
- FileSize: Use the size for your selected journal, as discussed above
|
||||||
|
|
||||||
- FileOutput: text file that will contain the exported data
|
- FileOutput or FileInput: text file that will contain the exported data
|
||||||
|
|
||||||
|
|
||||||
|
See [Tools](tools.md) for more information.
|
||||||
|
|
1
pom.xml
1
pom.xml
|
@ -468,6 +468,7 @@
|
||||||
<module>integration/activemq-aerogear-integration</module>
|
<module>integration/activemq-aerogear-integration</module>
|
||||||
<module>integration/activemq-vertx-integration</module>
|
<module>integration/activemq-vertx-integration</module>
|
||||||
<module>distribution</module>
|
<module>distribution</module>
|
||||||
|
<module>tests</module>
|
||||||
</modules>
|
</modules>
|
||||||
</profile>
|
</profile>
|
||||||
<profile>
|
<profile>
|
||||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.activemq.core.journal.RecordInfo;
|
||||||
import org.apache.activemq.core.journal.SequentialFile;
|
import org.apache.activemq.core.journal.SequentialFile;
|
||||||
import org.apache.activemq.core.journal.SequentialFileFactory;
|
import org.apache.activemq.core.journal.SequentialFileFactory;
|
||||||
import org.apache.activemq.core.journal.impl.AbstractJournalUpdateTask;
|
import org.apache.activemq.core.journal.impl.AbstractJournalUpdateTask;
|
||||||
import org.apache.activemq.core.journal.impl.ExportJournal;
|
|
||||||
import org.apache.activemq.core.journal.impl.JournalCompactor;
|
import org.apache.activemq.core.journal.impl.JournalCompactor;
|
||||||
import org.apache.activemq.core.journal.impl.JournalFile;
|
import org.apache.activemq.core.journal.impl.JournalFile;
|
||||||
import org.apache.activemq.core.journal.impl.JournalFileImpl;
|
import org.apache.activemq.core.journal.impl.JournalFileImpl;
|
||||||
|
@ -50,6 +49,7 @@ import org.apache.activemq.core.server.impl.ServerMessageImpl;
|
||||||
import org.apache.activemq.tests.unit.core.journal.impl.JournalImplTestBase;
|
import org.apache.activemq.tests.unit.core.journal.impl.JournalImplTestBase;
|
||||||
import org.apache.activemq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
|
import org.apache.activemq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
|
||||||
import org.apache.activemq.tests.util.UnitTestCase;
|
import org.apache.activemq.tests.util.UnitTestCase;
|
||||||
|
import org.apache.activemq.tools.ExportJournal;
|
||||||
import org.apache.activemq.utils.IDGenerator;
|
import org.apache.activemq.utils.IDGenerator;
|
||||||
import org.apache.activemq.utils.OrderedExecutorFactory;
|
import org.apache.activemq.utils.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.utils.SimpleIDGenerator;
|
import org.apache.activemq.utils.SimpleIDGenerator;
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.tests.integration.persistence;
|
package org.apache.activemq.tests.integration.persistence;
|
||||||
|
|
||||||
|
import org.apache.activemq.tools.ExportJournal;
|
||||||
|
import org.apache.activemq.tools.ImportJournal;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -27,8 +29,6 @@ import org.apache.activemq.api.core.client.ClientProducer;
|
||||||
import org.apache.activemq.api.core.client.ClientSession;
|
import org.apache.activemq.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.api.core.client.ClientSessionFactory;
|
import org.apache.activemq.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.api.core.client.ServerLocator;
|
import org.apache.activemq.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.core.journal.impl.ExportJournal;
|
|
||||||
import org.apache.activemq.core.journal.impl.ImportJournal;
|
|
||||||
import org.apache.activemq.core.server.ActiveMQServer;
|
import org.apache.activemq.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.tests.util.ServiceTestBase;
|
import org.apache.activemq.tests.util.ServiceTestBase;
|
||||||
|
|
||||||
|
@ -117,10 +117,12 @@ public class ExportFormatTest extends ServiceTestBase
|
||||||
locator.close();
|
locator.close();
|
||||||
server.stop();
|
server.stop();
|
||||||
|
|
||||||
|
System.out.println();
|
||||||
System.out.println("copy & paste the following as bindingsFile:");
|
System.out.println("copy & paste the following as bindingsFile:");
|
||||||
|
|
||||||
ExportJournal.exportJournal(getBindingsDir(), "activemq-bindings", "bindings", 2, 1048576, System.out);
|
ExportJournal.exportJournal(getBindingsDir(), "activemq-bindings", "bindings", 2, 1048576, System.out);
|
||||||
|
|
||||||
|
System.out.println();
|
||||||
System.out.println("copy & paste the following as dataFile:");
|
System.out.println("copy & paste the following as dataFile:");
|
||||||
|
|
||||||
ExportJournal.exportJournal(getJournalDir(), "activemq-data", "amq", 2, 102400, System.out);
|
ExportJournal.exportJournal(getJournalDir(), "activemq-data", "amq", 2, 102400, System.out);
|
||||||
|
|
|
@ -66,6 +66,11 @@
|
||||||
<artifactId>activemq-bootstrap</artifactId>
|
<artifactId>activemq-bootstrap</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.activemq</groupId>
|
||||||
|
<artifactId>activemq-tools</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.geronimo.specs</groupId>
|
<groupId>org.apache.geronimo.specs</groupId>
|
||||||
<artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
|
<artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.tests.unit.core.journal.impl;
|
package org.apache.activemq.tests.unit.core.journal.impl;
|
||||||
|
import org.apache.activemq.tools.ExportJournal;
|
||||||
|
import org.apache.activemq.tools.ImportJournal;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
||||||
|
@ -35,8 +37,6 @@ import org.apache.activemq.core.journal.PreparedTransactionInfo;
|
||||||
import org.apache.activemq.core.journal.RecordInfo;
|
import org.apache.activemq.core.journal.RecordInfo;
|
||||||
import org.apache.activemq.core.journal.SequentialFileFactory;
|
import org.apache.activemq.core.journal.SequentialFileFactory;
|
||||||
import org.apache.activemq.core.journal.TestableJournal;
|
import org.apache.activemq.core.journal.TestableJournal;
|
||||||
import org.apache.activemq.core.journal.impl.ExportJournal;
|
|
||||||
import org.apache.activemq.core.journal.impl.ImportJournal;
|
|
||||||
import org.apache.activemq.core.journal.impl.JournalImpl;
|
import org.apache.activemq.core.journal.impl.JournalImpl;
|
||||||
import org.apache.activemq.tests.util.UnitTestCase;
|
import org.apache.activemq.tests.util.UnitTestCase;
|
||||||
import org.apache.activemq.utils.ReusableLatch;
|
import org.apache.activemq.utils.ReusableLatch;
|
||||||
|
|
Loading…
Reference in New Issue