HBASE-26273 Force ReadType.STREAM when the user does not explicitly set a ReadType on the Scan for a Snapshot-based Job
HBase 2 moved over Scans to use PREAD by default instead of STREAM like HBase 1. In the context of a MapReduce job, we can generally expect that clients using the InputFormat (batch job) would be reading most of the data for a job. Cater to them, but still give users who want PREAD the ability to do so. Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
This commit is contained in:
parent
d26bcaaa91
commit
86bc640c17
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
@ -128,6 +129,14 @@ public class TableSnapshotInputFormatImpl {
|
|||
|
||||
public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true;
|
||||
|
||||
/**
|
||||
* The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot,
|
||||
* default STREAM.
|
||||
*/
|
||||
public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE =
|
||||
"hbase.TableSnapshotInputFormat.scanner.readtype";
|
||||
public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM;
|
||||
|
||||
/**
|
||||
* Implementation class for InputSplit logic common between mapred and mapreduce.
|
||||
*/
|
||||
|
@ -382,6 +391,15 @@ public class TableSnapshotInputFormatImpl {
|
|||
} else {
|
||||
throw new IllegalArgumentException("Unable to create scan");
|
||||
}
|
||||
|
||||
if (scan.getReadType() == ReadType.DEFAULT) {
|
||||
LOG.info("Provided Scan has DEFAULT ReadType,"
|
||||
+ " updating STREAM for Snapshot-based InputFormat");
|
||||
// Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case.
|
||||
scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE,
|
||||
SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT));
|
||||
}
|
||||
|
||||
return scan;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNA
|
|||
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
|
||||
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
|
||||
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
|
@ -407,6 +410,36 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScannerReadTypeConfiguration() throws IOException {
|
||||
Configuration conf = new Configuration(false);
|
||||
// Explicitly set ReadTypes should persist
|
||||
for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) {
|
||||
Scan scanWithReadType = new Scan();
|
||||
scanWithReadType.setReadType(readType);
|
||||
assertEquals(scanWithReadType.getReadType(),
|
||||
serializeAndReturn(conf, scanWithReadType).getReadType());
|
||||
}
|
||||
// We should only see the DEFAULT ReadType getting updated to STREAM.
|
||||
Scan scanWithoutReadType = new Scan();
|
||||
assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
|
||||
assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType());
|
||||
|
||||
// We should still be able to force a certain ReadType when DEFAULT is given.
|
||||
conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD);
|
||||
assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
|
||||
assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType());
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes and deserializes the given scan in the same manner that
|
||||
* TableSnapshotInputFormat does.
|
||||
*/
|
||||
private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException {
|
||||
conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));
|
||||
return TableSnapshotInputFormatImpl.extractScanFromConf(conf);
|
||||
}
|
||||
|
||||
private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
|
||||
byte[] startRow, byte[] stopRow)
|
||||
throws IOException, InterruptedException {
|
||||
|
|
Loading…
Reference in New Issue