HBASE-26273 Force ReadType.STREAM when the user does not explicitly set a ReadType on the Scan for a Snapshot-based Job
HBase 2 moved over Scans to use PREAD by default instead of STREAM like HBase 1. In the context of a MapReduce job, we can generally expect that clients using the InputFormat (batch job) would be reading most of the data for a job. Cater to them, but still give users who want PREAD the ability to do so. Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
This commit is contained in:
parent
e139d31612
commit
e8d62139d5
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
@ -128,6 +129,14 @@ public class TableSnapshotInputFormatImpl {
|
||||||
|
|
||||||
public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true;
|
public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot,
|
||||||
|
* default STREAM.
|
||||||
|
*/
|
||||||
|
public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE =
|
||||||
|
"hbase.TableSnapshotInputFormat.scanner.readtype";
|
||||||
|
public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation class for InputSplit logic common between mapred and mapreduce.
|
* Implementation class for InputSplit logic common between mapred and mapreduce.
|
||||||
*/
|
*/
|
||||||
|
@ -382,6 +391,15 @@ public class TableSnapshotInputFormatImpl {
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Unable to create scan");
|
throw new IllegalArgumentException("Unable to create scan");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (scan.getReadType() == ReadType.DEFAULT) {
|
||||||
|
LOG.info("Provided Scan has DEFAULT ReadType,"
|
||||||
|
+ " updating STREAM for Snapshot-based InputFormat");
|
||||||
|
// Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case.
|
||||||
|
scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE,
|
||||||
|
SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT));
|
||||||
|
}
|
||||||
|
|
||||||
return scan;
|
return scan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNA
|
||||||
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
|
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
|
||||||
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
|
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
|
||||||
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
|
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
|
import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
@ -407,6 +410,36 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScannerReadTypeConfiguration() throws IOException {
|
||||||
|
Configuration conf = new Configuration(false);
|
||||||
|
// Explicitly set ReadTypes should persist
|
||||||
|
for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) {
|
||||||
|
Scan scanWithReadType = new Scan();
|
||||||
|
scanWithReadType.setReadType(readType);
|
||||||
|
assertEquals(scanWithReadType.getReadType(),
|
||||||
|
serializeAndReturn(conf, scanWithReadType).getReadType());
|
||||||
|
}
|
||||||
|
// We should only see the DEFAULT ReadType getting updated to STREAM.
|
||||||
|
Scan scanWithoutReadType = new Scan();
|
||||||
|
assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
|
||||||
|
assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType());
|
||||||
|
|
||||||
|
// We should still be able to force a certain ReadType when DEFAULT is given.
|
||||||
|
conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD);
|
||||||
|
assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
|
||||||
|
assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serializes and deserializes the given scan in the same manner that
|
||||||
|
* TableSnapshotInputFormat does.
|
||||||
|
*/
|
||||||
|
private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException {
|
||||||
|
conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));
|
||||||
|
return TableSnapshotInputFormatImpl.extractScanFromConf(conf);
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
|
private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
|
||||||
byte[] startRow, byte[] stopRow)
|
byte[] startRow, byte[] stopRow)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
Loading…
Reference in New Issue