moving export-journal / import-journal to the proper tools place

This is simply moving the export/import journal to its proper place.
The previous commit should have added docs about this
This commit is contained in:
Clebert Suconic 2015-02-25 22:45:21 -05:00
parent ed7c429624
commit 210222e24f
7 changed files with 152 additions and 96 deletions

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.journal.impl;
package org.apache.activemq.tools;
import java.io.BufferedOutputStream;
import java.io.File;
@ -24,18 +24,20 @@ import java.util.List;
import org.apache.activemq.core.journal.RecordInfo;
import org.apache.activemq.core.journal.SequentialFileFactory;
import org.apache.activemq.core.journal.impl.JournalFile;
import org.apache.activemq.core.journal.impl.JournalImpl;
import org.apache.activemq.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.core.journal.impl.NIOSequentialFileFactory;
import org.apache.activemq.utils.Base64;
/**
* Use this class to export the journal data. You can use it as a main class or through its native method {@link ExportJournal#exportJournal(String, String, String, int, int, String)}
*
* Use this class to export the journal data. You can use it as a main class or through its static method {@link #exportJournal(String, String, String, int, int, String)}
* <p/>
* If you use the main method, use it as <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>
*
* Example: java -cp activemq-core.jar org.apache.activemq.core.journal.impl.ExportJournal /journalDir activemq-data amq 2 10485760 /tmp/export.dat
* <p/>
* Example: java -cp activemq-tools*-jar-with-dependencies.jar export-journal /journalDir activemq-data amq 2 10485760 /tmp/export.dat
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
*
*/
public class ExportJournal
{
@ -50,17 +52,21 @@ public class ExportJournal
// Public --------------------------------------------------------
public static void main(final String[] arg)
public void process(final String[] arg)
{
if (arg.length != 5)
if (arg.length != 6)
{
System.err.println("Use: java -cp activemq-core.jar org.apache.activemq.core.journal.impl.ExportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
return;
for (int i = 0; i < arg.length; i++)
{
System.out.println("arg[" + i + "] = " + arg[i]);
}
printUsage();
System.exit(-1);
}
try
{
ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
exportJournal(arg[1], arg[2], arg[3], 2, Integer.parseInt(arg[4]), arg[5]);
}
catch (Exception e)
{
@ -83,7 +89,7 @@ public class ExportJournal
PrintStream out = new PrintStream(buffOut);
ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
out.close();
}
@ -105,7 +111,7 @@ public class ExportJournal
{
out.println("#File," + file);
ExportJournal.exportJournalFile(out, nio, file);
exportJournalFile(out, nio, file);
}
}
@ -124,12 +130,12 @@ public class ExportJournal
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
out.println("operation@UpdateTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
}
public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
{
out.println("operation@Update," + ExportJournal.describeRecord(recordInfo));
out.println("operation@Update," + describeRecord(recordInfo));
}
public void onReadRollbackRecord(final long transactionID) throws Exception
@ -143,14 +149,14 @@ public class ExportJournal
",numberOfRecords@" +
numberOfRecords +
",extraData@" +
ExportJournal.encode(extraData));
encode(extraData));
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
out.println("operation@DeleteRecordTX,txID@" + transactionID +
"," +
ExportJournal.describeRecord(recordInfo));
describeRecord(recordInfo));
}
public void onReadDeleteRecord(final long recordID) throws Exception
@ -165,12 +171,12 @@ public class ExportJournal
public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
out.println("operation@AddRecordTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
}
public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
{
out.println("operation@AddRecord," + ExportJournal.describeRecord(recordInfo));
out.println("operation@AddRecord," + describeRecord(recordInfo));
}
public void markAsDataFile(final JournalFile file)
@ -191,7 +197,7 @@ public class ExportJournal
",compactCount@" +
recordInfo.compactCount +
",data@" +
ExportJournal.encode(recordInfo.data);
encode(recordInfo.data);
}
private static String encode(final byte[] data)
@ -199,12 +205,22 @@ public class ExportJournal
return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
public void printUsage()
{
for (int i = 0; i < 10; i++)
{
System.err.println();
}
System.err.println("This method will export the journal at low level record.");
System.err.println();
System.err.println(Main.USAGE + " export-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
System.err.println();
for (int i = 0; i < 10; i++)
{
System.err.println();
}
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.journal.impl;
package org.apache.activemq.tools;
import java.io.BufferedReader;
import java.io.File;
@ -28,12 +28,15 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.core.journal.RecordInfo;
import org.apache.activemq.core.journal.impl.JournalImpl;
import org.apache.activemq.core.journal.impl.JournalRecord;
import org.apache.activemq.core.journal.impl.NIOSequentialFileFactory;
import org.apache.activemq.utils.Base64;
/**
* Use this class to import the journal data from a listed file. You can use it as a main class or
* through its native method
* {@link ImportJournal#importJournal(String, String, String, int, int, String)}
* {@link #importJournal(String, String, String, int, int, String)}
* <p>
* If you use the main method, use its arguments as:
*
@ -61,17 +64,26 @@ public class ImportJournal
// Public --------------------------------------------------------
public static void main(final String[] arg)
public void process(final String[] arg)
{
if (arg.length != 5)
for (int i = 0; i < arg.length; i++)
{
System.err.println("Use: java -cp activemq-core.jar:netty.jar org.apache.activemq.core.journal.impl.ImportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
return;
System.out.println("arg[" + i + "] = " + arg[i]);
}
if (arg.length != 6)
{
for (int i = 0; i < arg.length; i++)
{
System.out.println("arg[" + i + "] = " + arg[i]);
}
printUsage();
System.exit(-1);
}
try
{
ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
importJournal(arg[1], arg[2], arg[3], 2, Integer.parseInt(arg[4]), arg[5]);
}
catch (Exception e)
{
@ -88,7 +100,7 @@ public class ImportJournal
final String fileInput) throws Exception
{
FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
}
@ -100,7 +112,7 @@ public class ImportJournal
final InputStream stream) throws Exception
{
Reader reader = new InputStreamReader(stream);
ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
}
public static void importJournal(final String directory,
@ -153,7 +165,7 @@ public class ImportJournal
continue;
}
Properties lineProperties = ImportJournal.parseLine(splitLine);
Properties lineProperties = parseLine(splitLine);
String operation = null;
try
@ -162,41 +174,41 @@ public class ImportJournal
if (operation.equals("AddRecord"))
{
RecordInfo info = ImportJournal.parseRecord(lineProperties);
RecordInfo info = parseRecord(lineProperties);
journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
}
else if (operation.equals("AddRecordTX"))
{
long txID = ImportJournal.parseLong("txID", lineProperties);
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
long txID = parseLong("txID", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
counter.incrementAndGet();
RecordInfo info = ImportJournal.parseRecord(lineProperties);
RecordInfo info = parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
}
else if (operation.equals("AddRecordTX"))
{
long txID = ImportJournal.parseLong("txID", lineProperties);
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
long txID = parseLong("txID", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
counter.incrementAndGet();
RecordInfo info = ImportJournal.parseRecord(lineProperties);
RecordInfo info = parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
}
else if (operation.equals("UpdateTX"))
{
long txID = ImportJournal.parseLong("txID", lineProperties);
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
long txID = parseLong("txID", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
counter.incrementAndGet();
RecordInfo info = ImportJournal.parseRecord(lineProperties);
RecordInfo info = parseRecord(lineProperties);
journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
}
else if (operation.equals("Update"))
{
RecordInfo info = ImportJournal.parseRecord(lineProperties);
RecordInfo info = parseRecord(lineProperties);
journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
}
else if (operation.equals("DeleteRecord"))
{
long id = ImportJournal.parseLong("id", lineProperties);
long id = parseLong("id", lineProperties);
// If not found it means the append/update records were reclaimed already
if (journalRecords.get(id) != null)
@ -206,9 +218,9 @@ public class ImportJournal
}
else if (operation.equals("DeleteRecordTX"))
{
long txID = ImportJournal.parseLong("txID", lineProperties);
long id = ImportJournal.parseLong("id", lineProperties);
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
long txID = parseLong("txID", lineProperties);
long id = parseLong("id", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
counter.incrementAndGet();
// If not found it means the append/update records were reclaimed already
@ -219,10 +231,10 @@ public class ImportJournal
}
else if (operation.equals("Prepare"))
{
long txID = ImportJournal.parseLong("txID", lineProperties);
int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
byte[] data = ImportJournal.parseEncoding("extraData", lineProperties);
long txID = parseLong("txID", lineProperties);
int numberOfRecords = parseInt("numberOfRecords", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
byte[] data = parseEncoding("extraData", lineProperties);
if (counter.get() == numberOfRecords)
{
@ -241,9 +253,9 @@ public class ImportJournal
}
else if (operation.equals("Commit"))
{
long txID = ImportJournal.parseLong("txID", lineProperties);
int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
long txID = parseLong("txID", lineProperties);
int numberOfRecords = parseInt("numberOfRecords", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
if (counter.get() == numberOfRecords)
{
journal.appendCommitRecord(txID, false);
@ -261,12 +273,12 @@ public class ImportJournal
}
else if (operation.equals("Rollback"))
{
long txID = ImportJournal.parseLong("txID", lineProperties);
long txID = parseLong("txID", lineProperties);
journal.appendRollbackRecord(txID, false);
}
else
{
System.err.println("Invalid opeartion " + operation + " at line " + lineNumber);
System.err.println("Invalid operation " + operation + " at line " + lineNumber);
}
}
catch (Exception ex)
@ -293,18 +305,18 @@ public class ImportJournal
protected static RecordInfo parseRecord(final Properties properties) throws Exception
{
long id = ImportJournal.parseLong("id", properties);
byte userRecordType = ImportJournal.parseByte("userRecordType", properties);
boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties);
byte[] data = ImportJournal.parseEncoding("data", properties);
long id = parseLong("id", properties);
byte userRecordType = parseByte("userRecordType", properties);
boolean isUpdate = parseBoolean("isUpdate", properties);
byte[] data = parseEncoding("data", properties);
return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
}
private static byte[] parseEncoding(final String name, final Properties properties) throws Exception
{
String value = ImportJournal.parseString(name, properties);
String value = parseString(name, properties);
return ImportJournal.decode(value);
return decode(value);
}
/**
@ -313,28 +325,28 @@ public class ImportJournal
*/
private static int parseInt(final String name, final Properties properties) throws Exception
{
String value = ImportJournal.parseString(name, properties);
String value = parseString(name, properties);
return Integer.parseInt(value);
}
private static long parseLong(final String name, final Properties properties) throws Exception
{
String value = ImportJournal.parseString(name, properties);
String value = parseString(name, properties);
return Long.parseLong(value);
}
private static boolean parseBoolean(final String name, final Properties properties) throws Exception
{
String value = ImportJournal.parseString(name, properties);
String value = parseString(name, properties);
return Boolean.parseBoolean(value);
}
private static byte parseByte(final String name, final Properties properties) throws Exception
{
String value = ImportJournal.parseString(name, properties);
String value = parseString(name, properties);
return Byte.parseByte(value);
}
@ -381,12 +393,21 @@ public class ImportJournal
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
public void printUsage()
{
for (int i = 0; i < 10; i++)
{
System.err.println();
}
System.err.println("This method will export the journal at low level record.");
System.err.println();
System.err.println(Main.USAGE + " import-journal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileInput>");
System.err.println();
for (int i = 0; i < 10; i++)
{
System.err.println();
}
}
}

View File

@ -25,7 +25,9 @@ public class Main
private static final String PRINT_PAGES = "print-pages";
private static final String DATA_TOOL = "data-tool";
private static final String TRANSFER = "transfer-queue";
private static final String OPTIONS = " [" + IMPORT + "|" + EXPORT + "|" + PRINT_DATA + "|" + PRINT_PAGES + "|" + DATA_TOOL + "|" + TRANSFER + "]";
private static final String EXPORT_JOURNAL = "export-journal";
private static final String IMPORT_JOURNAL = "import-journal";
private static final String OPTIONS = " [" + IMPORT + "|" + EXPORT + "|" + PRINT_DATA + "|" + PRINT_PAGES + "|" + DATA_TOOL + "|" + TRANSFER + "|" + EXPORT_JOURNAL + "|" + IMPORT_JOURNAL + "]";
public static void main(String[] arg) throws Exception
{
@ -36,7 +38,17 @@ public class Main
}
if (TRANSFER.equals(arg[0]))
if (IMPORT_JOURNAL.equals(arg[0]))
{
ImportJournal tool = new ImportJournal();
tool.process(arg);
}
else if (EXPORT_JOURNAL.equals(arg[0]))
{
ExportJournal tool = new ExportJournal();
tool.process(arg);
}
else if (TRANSFER.equals(arg[0]))
{
TransferQueue tool = new TransferQueue();
tool.process(arg);

View File

@ -38,7 +38,6 @@ import org.apache.activemq.core.journal.RecordInfo;
import org.apache.activemq.core.journal.SequentialFile;
import org.apache.activemq.core.journal.SequentialFileFactory;
import org.apache.activemq.core.journal.impl.AbstractJournalUpdateTask;
import org.apache.activemq.core.journal.impl.ExportJournal;
import org.apache.activemq.core.journal.impl.JournalCompactor;
import org.apache.activemq.core.journal.impl.JournalFile;
import org.apache.activemq.core.journal.impl.JournalFileImpl;
@ -50,6 +49,7 @@ import org.apache.activemq.core.server.impl.ServerMessageImpl;
import org.apache.activemq.tests.unit.core.journal.impl.JournalImplTestBase;
import org.apache.activemq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.apache.activemq.tests.util.UnitTestCase;
import org.apache.activemq.tools.ExportJournal;
import org.apache.activemq.utils.IDGenerator;
import org.apache.activemq.utils.OrderedExecutorFactory;
import org.apache.activemq.utils.SimpleIDGenerator;

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.tests.integration.persistence;
import org.apache.activemq.tools.ExportJournal;
import org.apache.activemq.tools.ImportJournal;
import org.junit.Ignore;
import org.junit.Test;
@ -27,8 +29,6 @@ import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.journal.impl.ExportJournal;
import org.apache.activemq.core.journal.impl.ImportJournal;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.tests.util.ServiceTestBase;
@ -117,10 +117,12 @@ public class ExportFormatTest extends ServiceTestBase
locator.close();
server.stop();
System.out.println();
System.out.println("copy & paste the following as bindingsFile:");
ExportJournal.exportJournal(getBindingsDir(), "activemq-bindings", "bindings", 2, 1048576, System.out);
System.out.println();
System.out.println("copy & paste the following as dataFile:");
ExportJournal.exportJournal(getJournalDir(), "activemq-data", "amq", 2, 102400, System.out);

View File

@ -66,6 +66,11 @@
<artifactId>activemq-bootstrap</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-tools</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>

View File

@ -15,6 +15,8 @@
* limitations under the License.
*/
package org.apache.activemq.tests.unit.core.journal.impl;
import org.apache.activemq.tools.ExportJournal;
import org.apache.activemq.tools.ImportJournal;
import org.junit.Before;
import org.junit.After;
@ -35,8 +37,6 @@ import org.apache.activemq.core.journal.PreparedTransactionInfo;
import org.apache.activemq.core.journal.RecordInfo;
import org.apache.activemq.core.journal.SequentialFileFactory;
import org.apache.activemq.core.journal.TestableJournal;
import org.apache.activemq.core.journal.impl.ExportJournal;
import org.apache.activemq.core.journal.impl.ImportJournal;
import org.apache.activemq.core.journal.impl.JournalImpl;
import org.apache.activemq.tests.util.UnitTestCase;
import org.apache.activemq.utils.ReusableLatch;