HBASE-9895: 0.96 Import utility can't import an exported file from 0.94
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1540926 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6ec64a1dc0
commit
477072f7f0
|
@ -451,6 +451,10 @@ public class Import {
|
||||||
usage("Wrong number of arguments: " + otherArgs.length);
|
usage("Wrong number of arguments: " + otherArgs.length);
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
|
String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
|
||||||
|
if (inputVersionString != null) {
|
||||||
|
conf.set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
|
||||||
|
}
|
||||||
Job job = createSubmittableJob(conf, otherArgs);
|
Job job = createSubmittableJob(conf, otherArgs);
|
||||||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,18 +17,33 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.serializer.Deserializer;
|
import org.apache.hadoop.io.serializer.Deserializer;
|
||||||
import org.apache.hadoop.io.serializer.Serialization;
|
import org.apache.hadoop.io.serializer.Serialization;
|
||||||
import org.apache.hadoop.io.serializer.Serializer;
|
import org.apache.hadoop.io.serializer.Serializer;
|
||||||
|
|
||||||
public class ResultSerialization implements Serialization<Result> {
|
public class ResultSerialization extends Configured implements Serialization<Result> {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ResultSerialization.class);
|
||||||
|
// The following configuration property indicates import file format version.
|
||||||
|
public static final String IMPORT_FORMAT_VER = "hbase.import.version";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Class<?> c) {
|
public boolean accept(Class<?> c) {
|
||||||
return Result.class.isAssignableFrom(c);
|
return Result.class.isAssignableFrom(c);
|
||||||
|
@ -36,6 +51,16 @@ public class ResultSerialization implements Serialization<Result> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Deserializer<Result> getDeserializer(Class<Result> c) {
|
public Deserializer<Result> getDeserializer(Class<Result> c) {
|
||||||
|
// check input format version
|
||||||
|
Configuration conf = getConf();
|
||||||
|
if (conf != null) {
|
||||||
|
String inputVersion = conf.get(IMPORT_FORMAT_VER);
|
||||||
|
if (inputVersion != null && inputVersion.equals("0.94")) {
|
||||||
|
LOG.info("Load exported file using deserializer for HBase 0.94 format");
|
||||||
|
return new Result94Deserializer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return new ResultDeserializer();
|
return new ResultDeserializer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,6 +69,52 @@ public class ResultSerialization implements Serialization<Result> {
|
||||||
return new ResultSerializer();
|
return new ResultSerializer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The following deserializer class is used to load exported file of 0.94
|
||||||
|
*/
|
||||||
|
private static class Result94Deserializer implements Deserializer<Result> {
|
||||||
|
private DataInputStream in;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result deserialize(Result mutation) throws IOException {
|
||||||
|
int totalBuffer = in.readInt();
|
||||||
|
if (totalBuffer == 0) {
|
||||||
|
return Result.EMPTY_RESULT;
|
||||||
|
}
|
||||||
|
byte[] buf = new byte[totalBuffer];
|
||||||
|
readChunked(in, buf, 0, totalBuffer);
|
||||||
|
List<Cell> kvs = new ArrayList<Cell>();
|
||||||
|
int offset = 0;
|
||||||
|
while (offset < totalBuffer) {
|
||||||
|
int keyLength = Bytes.toInt(buf, offset);
|
||||||
|
offset += Bytes.SIZEOF_INT;
|
||||||
|
kvs.add(new KeyValue(buf, offset, keyLength));
|
||||||
|
offset += keyLength;
|
||||||
|
}
|
||||||
|
return Result.create(kvs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(InputStream in) throws IOException {
|
||||||
|
if (!(in instanceof DataInputStream)) {
|
||||||
|
throw new IOException("Wrong input stream instance passed in");
|
||||||
|
}
|
||||||
|
this.in = (DataInputStream) in;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException {
|
||||||
|
int maxRead = 8192;
|
||||||
|
|
||||||
|
for (; ofs < len; ofs += maxRead)
|
||||||
|
in.readFully(dest, ofs, Math.min(len - ofs, maxRead));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class ResultDeserializer implements Deserializer<Result> {
|
private static class ResultDeserializer implements Deserializer<Result> {
|
||||||
private InputStream in;
|
private InputStream in;
|
||||||
|
|
||||||
|
@ -54,8 +125,7 @@ public class ResultSerialization implements Serialization<Result> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result deserialize(Result mutation) throws IOException {
|
public Result deserialize(Result mutation) throws IOException {
|
||||||
ClientProtos.Result proto =
|
ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in);
|
||||||
ClientProtos.Result.parseDelimitedFrom(in);
|
|
||||||
return ProtobufUtil.toResult(proto);
|
return ProtobufUtil.toResult(proto);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,8 +133,8 @@ public class ResultSerialization implements Serialization<Result> {
|
||||||
public void open(InputStream in) throws IOException {
|
public void open(InputStream in) throws IOException {
|
||||||
this.in = in;
|
this.in = in;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ResultSerializer implements Serializer<Result> {
|
private static class ResultSerializer implements Serializer<Result> {
|
||||||
private OutputStream out;
|
private OutputStream out;
|
||||||
|
|
||||||
|
|
|
@ -21,10 +21,15 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -66,8 +71,6 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import static org.mockito.Mockito.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests the table import and table export MR job functionality
|
* Tests the table import and table export MR job functionality
|
||||||
*/
|
*/
|
||||||
|
@ -190,7 +193,7 @@ public class TestImportExport {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test export hbase:meta table
|
* Test export hbase:meta table
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
@ -200,6 +203,38 @@ public class TestImportExport {
|
||||||
assertTrue(runExport(args));
|
assertTrue(runExport(args));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test import data from 0.94 exported file
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testImport94Table() throws Exception {
|
||||||
|
URL url = TestImportExport.class.getResource(
|
||||||
|
"exportedTableIn94Format");
|
||||||
|
Path importPath = new Path(url.getPath());
|
||||||
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||||
|
fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR
|
||||||
|
+ "exportedTableIn94Format"));
|
||||||
|
String IMPORT_TABLE = "importTableExportedFrom94";
|
||||||
|
HTable t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), Bytes.toBytes("f1"), 3);
|
||||||
|
String[] args = new String[] {
|
||||||
|
"-Dhbase.import.version=0.94" ,
|
||||||
|
IMPORT_TABLE, FQ_OUTPUT_DIR
|
||||||
|
};
|
||||||
|
assertTrue(runImport(args));
|
||||||
|
|
||||||
|
/* exportedTableIn94Format contains 5 rows
|
||||||
|
ROW COLUMN+CELL
|
||||||
|
r1 column=f1:c1, timestamp=1383766761171, value=val1
|
||||||
|
r2 column=f1:c1, timestamp=1383766771642, value=val2
|
||||||
|
r3 column=f1:c1, timestamp=1383766777615, value=val3
|
||||||
|
r4 column=f1:c1, timestamp=1383766785146, value=val4
|
||||||
|
r5 column=f1:c1, timestamp=1383766791506, value=val5
|
||||||
|
*/
|
||||||
|
assertEquals(5, UTIL.countRows(t));
|
||||||
|
t.close();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test export scanner batching
|
* Test export scanner batching
|
||||||
*/
|
*/
|
||||||
|
|
Binary file not shown.
Loading…
Reference in New Issue