HBASE-9910 TestHFilePerformance and HFilePerformanceEvaluation should be merged in a single HFile performance test class (Vikas Vishwakarma)
Amending-Author: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
a7d93155f1
commit
f9cf565f1d
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -30,6 +31,10 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.io.crypto.aes.AES;
|
||||
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
@ -45,6 +50,15 @@ public class HFilePerformanceEvaluation {
|
|||
private static final int ROW_LENGTH = 10;
|
||||
private static final int ROW_COUNT = 1000000;
|
||||
private static final int RFILE_BLOCKSIZE = 8 * 1024;
|
||||
private static StringBuilder testSummary = new StringBuilder();
|
||||
|
||||
// Disable verbose INFO logging from org.apache.hadoop.io.compress.CodecPool
|
||||
static {
|
||||
System.setProperty("org.apache.commons.logging.Log",
|
||||
"org.apache.commons.logging.impl.SimpleLog");
|
||||
System.setProperty("org.apache.commons.logging.simplelog.log.org.apache.hadoop.io.compress.CodecPool",
|
||||
"WARN");
|
||||
}
|
||||
|
||||
static final Log LOG =
|
||||
LogFactory.getLog(HFilePerformanceEvaluation.class.getName());
|
||||
|
@ -82,56 +96,131 @@ public class HFilePerformanceEvaluation {
|
|||
return CellUtil.createCell(keyRow, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add any supported codec or cipher to test the HFile read/write performance.
|
||||
* Specify "none" to disable codec or cipher or both.
|
||||
* @throws Exception
|
||||
*/
|
||||
private void runBenchmarks() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final FileSystem fs = FileSystem.get(conf);
|
||||
final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
|
||||
|
||||
// codec=none cipher=none
|
||||
runWriteBenchmark(conf, fs, mf, "none", "none");
|
||||
runReadBenchmark(conf, fs, mf, "none", "none");
|
||||
|
||||
// codec=gz cipher=none
|
||||
runWriteBenchmark(conf, fs, mf, "gz", "none");
|
||||
runReadBenchmark(conf, fs, mf, "gz", "none");
|
||||
|
||||
// Add configuration for AES cipher
|
||||
final Configuration aesconf = new Configuration();
|
||||
aesconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||
aesconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||
aesconf.setInt("hfile.format.version", 3);
|
||||
final FileSystem aesfs = FileSystem.get(aesconf);
|
||||
final Path aesmf = aesfs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
|
||||
|
||||
// codec=none cipher=aes
|
||||
runWriteBenchmark(aesconf, aesfs, aesmf, "none", "aes");
|
||||
runReadBenchmark(aesconf, aesfs, aesmf, "none", "aes");
|
||||
|
||||
// codec=gz cipher=aes
|
||||
runWriteBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
|
||||
runReadBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
|
||||
|
||||
// cleanup test files
|
||||
if (fs.exists(mf)) {
|
||||
fs.delete(mf, true);
|
||||
}
|
||||
if (aesfs.exists(aesmf)) {
|
||||
aesfs.delete(aesmf, true);
|
||||
}
|
||||
|
||||
// Print Result Summary
|
||||
LOG.info("\n***************\n" + "Result Summary" + "\n***************\n");
|
||||
LOG.info(testSummary.toString());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a test HFile with the given codec & cipher
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param mf
|
||||
* @param codec "none", "lzo", "gz", "snappy"
|
||||
* @param cipher "none", "aes"
|
||||
* @throws Exception
|
||||
*/
|
||||
private void runWriteBenchmark(Configuration conf, FileSystem fs, Path mf, String codec,
|
||||
String cipher) throws Exception {
|
||||
if (fs.exists(mf)) {
|
||||
fs.delete(mf, true);
|
||||
}
|
||||
|
||||
runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT),
|
||||
ROW_COUNT);
|
||||
runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT, codec, cipher),
|
||||
ROW_COUNT, codec, cipher);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Run all the read benchmarks for the test HFile
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param mf
|
||||
* @param codec "none", "lzo", "gz", "snappy"
|
||||
* @param cipher "none", "aes"
|
||||
*/
|
||||
private void runReadBenchmark(final Configuration conf, final FileSystem fs, final Path mf,
|
||||
final String codec, final String cipher) {
|
||||
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
|
||||
ROW_COUNT);
|
||||
ROW_COUNT, codec, cipher);
|
||||
} catch (Exception e) {
|
||||
testSummary.append("UniformRandomSmallScan failed " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
|
||||
ROW_COUNT);
|
||||
ROW_COUNT, codec, cipher);
|
||||
} catch (Exception e) {
|
||||
testSummary.append("UniformRandomReadBenchmark failed " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
|
||||
ROW_COUNT);
|
||||
ROW_COUNT, codec, cipher);
|
||||
} catch (Exception e) {
|
||||
testSummary.append("GaussianRandomReadBenchmark failed " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
|
||||
ROW_COUNT);
|
||||
ROW_COUNT, codec, cipher);
|
||||
} catch (Exception e) {
|
||||
testSummary.append("SequentialReadBenchmark failed " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
@ -139,13 +228,22 @@ public class HFilePerformanceEvaluation {
|
|||
|
||||
}
|
||||
|
||||
protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount)
|
||||
throws Exception {
|
||||
LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
|
||||
rowCount + " rows.");
|
||||
protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount,
|
||||
String codec, String cipher) throws Exception {
|
||||
LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" +
|
||||
codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows.");
|
||||
|
||||
long elapsedTime = benchmark.run();
|
||||
LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
|
||||
rowCount + " rows took " + elapsedTime + "ms.");
|
||||
|
||||
LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" +
|
||||
codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows took " +
|
||||
elapsedTime + "ms.");
|
||||
|
||||
// Store results to print summary at the end
|
||||
testSummary.append("Running ").append(benchmark.getClass().getSimpleName())
|
||||
.append(" with codec[").append(codec).append("] cipher[").append(cipher)
|
||||
.append("] for ").append(rowCount).append(" rows took ").append(elapsedTime)
|
||||
.append("ms.").append("\n");
|
||||
}
|
||||
|
||||
static abstract class RowOrientedBenchmark {
|
||||
|
@ -154,6 +252,18 @@ public class HFilePerformanceEvaluation {
|
|||
protected final FileSystem fs;
|
||||
protected final Path mf;
|
||||
protected final int totalRows;
|
||||
protected String codec = "none";
|
||||
protected String cipher = "none";
|
||||
|
||||
public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
|
||||
int totalRows, String codec, String cipher) {
|
||||
this.conf = conf;
|
||||
this.fs = fs;
|
||||
this.mf = mf;
|
||||
this.totalRows = totalRows;
|
||||
this.codec = codec;
|
||||
this.cipher = cipher;
|
||||
}
|
||||
|
||||
public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
|
||||
int totalRows) {
|
||||
|
@ -208,15 +318,30 @@ public class HFilePerformanceEvaluation {
|
|||
private byte[] bytes = new byte[ROW_LENGTH];
|
||||
|
||||
public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
|
||||
int totalRows) {
|
||||
super(conf, fs, mf, totalRows);
|
||||
int totalRows, String codec, String cipher) {
|
||||
super(conf, fs, mf, totalRows, codec, cipher);
|
||||
}
|
||||
|
||||
@Override
|
||||
void setUp() throws Exception {
|
||||
HFileContext hFileContext = new HFileContextBuilder().withBlockSize(RFILE_BLOCKSIZE).build();
|
||||
writer =
|
||||
HFile.getWriterFactoryNoCache(conf)
|
||||
|
||||
HFileContextBuilder builder = new HFileContextBuilder()
|
||||
.withCompression(AbstractHFileWriter.compressionByName(codec))
|
||||
.withBlockSize(RFILE_BLOCKSIZE);
|
||||
|
||||
if (cipher == "aes") {
|
||||
byte[] cipherKey = new byte[AES.KEY_LENGTH];
|
||||
new SecureRandom().nextBytes(cipherKey);
|
||||
builder.withEncryptionContext(Encryption.newContext(conf)
|
||||
.setCipher(Encryption.getCipher(conf, cipher))
|
||||
.setKey(cipherKey));
|
||||
} else if (!"none".equals(cipher)) {
|
||||
throw new IOException("Cipher " + cipher + " not supported.");
|
||||
}
|
||||
|
||||
HFileContext hFileContext = builder.build();
|
||||
|
||||
writer = HFile.getWriterFactoryNoCache(conf)
|
||||
.withPath(fs, mf)
|
||||
.withFileContext(hFileContext)
|
||||
.withComparator(new KeyValue.RawBytesComparator())
|
||||
|
|
|
@ -1,455 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.SecureRandom;
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.io.crypto.aes.AES;
|
||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
/**
|
||||
* Set of long-running tests to measure performance of HFile.
|
||||
* <p>
|
||||
* Copied from
|
||||
* <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
|
||||
* Remove after tfile is committed and use the tfile version of this class
|
||||
* instead.</p>
|
||||
*/
|
||||
public class TestHFilePerformance extends AbstractHBaseTool {
|
||||
private HBaseTestingUtility TEST_UTIL;
|
||||
private static String ROOT_DIR;
|
||||
private FileSystem fs;
|
||||
private long startTimeEpoch;
|
||||
private long finishTimeEpoch;
|
||||
private DateFormat formatter;
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
try {
|
||||
fs = FileSystem.get(conf);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||
formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFilePerformance").toString();
|
||||
}
|
||||
|
||||
public void startTime() {
|
||||
startTimeEpoch = System.currentTimeMillis();
|
||||
System.out.println(formatTime() + " Started timing.");
|
||||
}
|
||||
|
||||
public void stopTime() {
|
||||
finishTimeEpoch = System.currentTimeMillis();
|
||||
System.out.println(formatTime() + " Stopped timing.");
|
||||
}
|
||||
|
||||
public long getIntervalMillis() {
|
||||
return finishTimeEpoch - startTimeEpoch;
|
||||
}
|
||||
|
||||
public void printlnWithTimestamp(String message) {
|
||||
System.out.println(formatTime() + " " + message);
|
||||
}
|
||||
|
||||
/*
|
||||
* Format millis into minutes and seconds.
|
||||
*/
|
||||
public String formatTime(long milis){
|
||||
return formatter.format(milis);
|
||||
}
|
||||
|
||||
public String formatTime(){
|
||||
return formatTime(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
private FSDataOutputStream createFSOutput(Path name) throws IOException {
|
||||
if (fs.exists(name))
|
||||
fs.delete(name, true);
|
||||
FSDataOutputStream fout = fs.create(name);
|
||||
return fout;
|
||||
}
|
||||
|
||||
//TODO have multiple ways of generating key/value e.g. dictionary words
|
||||
//TODO to have a sample compressable data, for now, made 1 out of 3 values random
|
||||
// keys are all random.
|
||||
|
||||
private static class KeyValueGenerator {
|
||||
Random keyRandomizer;
|
||||
Random valueRandomizer;
|
||||
long randomValueRatio = 3; // 1 out of randomValueRatio generated values will be random.
|
||||
long valueSequence = 0 ;
|
||||
|
||||
|
||||
KeyValueGenerator() {
|
||||
keyRandomizer = new Random(0L); //TODO with seed zero
|
||||
valueRandomizer = new Random(1L); //TODO with seed one
|
||||
}
|
||||
|
||||
// Key is always random now.
|
||||
void getKey(byte[] key) {
|
||||
keyRandomizer.nextBytes(key);
|
||||
}
|
||||
|
||||
void getValue(byte[] value) {
|
||||
if (valueSequence % randomValueRatio == 0)
|
||||
valueRandomizer.nextBytes(value);
|
||||
valueSequence++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param fileType "HFile" or "SequenceFile"
|
||||
* @param keyLength
|
||||
* @param valueLength
|
||||
* @param codecName "none", "lzo", "gz", "snappy"
|
||||
* @param cipherName "none", "aes"
|
||||
* @param rows number of rows to be written.
|
||||
* @param writeMethod used for HFile only.
|
||||
* @param minBlockSize used for HFile only.
|
||||
* @throws IOException
|
||||
*/
|
||||
//TODO writeMethod: implement multiple ways of writing e.g. A) known length (no chunk) B) using a buffer and streaming (for many chunks).
|
||||
public void timeWrite(String fileType, int keyLength, int valueLength,
|
||||
String codecName, String cipherName, long rows, String writeMethod, int minBlockSize)
|
||||
throws IOException {
|
||||
System.out.println("File Type: " + fileType);
|
||||
System.out.println("Writing " + fileType + " with codecName: " + codecName +
|
||||
" cipherName: " + cipherName);
|
||||
long totalBytesWritten = 0;
|
||||
|
||||
|
||||
//Using separate randomizer for key/value with seeds matching Sequence File.
|
||||
byte[] key = new byte[keyLength];
|
||||
byte[] value = new byte[valueLength];
|
||||
KeyValueGenerator generator = new KeyValueGenerator();
|
||||
|
||||
startTime();
|
||||
|
||||
Path path = new Path(ROOT_DIR, fileType + ".Performance");
|
||||
System.out.println(ROOT_DIR + Path.SEPARATOR + path.getName());
|
||||
FSDataOutputStream fout = createFSOutput(path);
|
||||
|
||||
if ("HFile".equals(fileType)){
|
||||
HFileContextBuilder builder = new HFileContextBuilder()
|
||||
.withCompression(AbstractHFileWriter.compressionByName(codecName))
|
||||
.withBlockSize(minBlockSize);
|
||||
if (cipherName != "none") {
|
||||
byte[] cipherKey = new byte[AES.KEY_LENGTH];
|
||||
new SecureRandom().nextBytes(cipherKey);
|
||||
builder.withEncryptionContext(
|
||||
Encryption.newContext(conf)
|
||||
.setCipher(Encryption.getCipher(conf, cipherName))
|
||||
.setKey(cipherKey));
|
||||
}
|
||||
HFileContext context = builder.build();
|
||||
System.out.println("HFile write method: ");
|
||||
HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
|
||||
.withOutputStream(fout)
|
||||
.withFileContext(context)
|
||||
.withComparator(new KeyValue.RawBytesComparator())
|
||||
.create();
|
||||
|
||||
// Writing value in one shot.
|
||||
for (long l=0; l<rows; l++ ) {
|
||||
generator.getKey(key);
|
||||
generator.getValue(value);
|
||||
writer.append(CellUtil.createCell(key, value));
|
||||
totalBytesWritten += key.length;
|
||||
totalBytesWritten += value.length;
|
||||
}
|
||||
writer.close();
|
||||
} else if ("SequenceFile".equals(fileType)){
|
||||
CompressionCodec codec = null;
|
||||
if ("gz".equals(codecName))
|
||||
codec = new GzipCodec();
|
||||
else if (!"none".equals(codecName))
|
||||
throw new IOException("Codec not supported.");
|
||||
|
||||
SequenceFile.Writer writer;
|
||||
|
||||
//TODO
|
||||
//JobConf conf = new JobConf();
|
||||
|
||||
if (!"none".equals(codecName))
|
||||
writer = SequenceFile.createWriter(conf, fout, BytesWritable.class,
|
||||
BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
|
||||
else
|
||||
writer = SequenceFile.createWriter(conf, fout, BytesWritable.class,
|
||||
BytesWritable.class, SequenceFile.CompressionType.NONE, null);
|
||||
|
||||
BytesWritable keyBsw;
|
||||
BytesWritable valBsw;
|
||||
for (long l=0; l<rows; l++ ) {
|
||||
|
||||
generator.getKey(key);
|
||||
keyBsw = new BytesWritable(key);
|
||||
totalBytesWritten += keyBsw.getSize();
|
||||
|
||||
generator.getValue(value);
|
||||
valBsw = new BytesWritable(value);
|
||||
writer.append(keyBsw, valBsw);
|
||||
totalBytesWritten += valBsw.getSize();
|
||||
}
|
||||
|
||||
writer.close();
|
||||
} else
|
||||
throw new IOException("File Type is not supported");
|
||||
|
||||
fout.close();
|
||||
stopTime();
|
||||
|
||||
printlnWithTimestamp("Data written: ");
|
||||
printlnWithTimestamp(" rate = " +
|
||||
totalBytesWritten / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
|
||||
printlnWithTimestamp(" total = " + totalBytesWritten + "B");
|
||||
|
||||
printlnWithTimestamp("File written: ");
|
||||
printlnWithTimestamp(" rate = " +
|
||||
fs.getFileStatus(path).getLen() / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
|
||||
printlnWithTimestamp(" total = " + fs.getFileStatus(path).getLen() + "B");
|
||||
}
|
||||
|
||||
public void timeReading(String fileType, int keyLength, int valueLength,
|
||||
long rows, int method) throws IOException {
|
||||
System.out.println("Reading file of type: " + fileType);
|
||||
Path path = new Path(ROOT_DIR, fileType + ".Performance");
|
||||
System.out.println("Input file size: " + fs.getFileStatus(path).getLen());
|
||||
long totalBytesRead = 0;
|
||||
|
||||
|
||||
ByteBuffer val;
|
||||
|
||||
ByteBuffer key;
|
||||
|
||||
startTime();
|
||||
FSDataInputStream fin = fs.open(path);
|
||||
|
||||
if ("HFile".equals(fileType)){
|
||||
HFile.Reader reader = HFile.createReaderFromStream(path, fs.open(path),
|
||||
fs.getFileStatus(path).getLen(), new CacheConfig(conf), conf);
|
||||
reader.loadFileInfo();
|
||||
switch (method) {
|
||||
|
||||
case 0:
|
||||
case 1:
|
||||
default:
|
||||
{
|
||||
HFileScanner scanner = reader.getScanner(false, false);
|
||||
scanner.seekTo();
|
||||
for (long l=0; l<rows; l++ ) {
|
||||
key = scanner.getKey();
|
||||
val = scanner.getValue();
|
||||
totalBytesRead += key.limit() + val.limit();
|
||||
scanner.next();
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
reader.close();
|
||||
} else if("SequenceFile".equals(fileType)){
|
||||
|
||||
SequenceFile.Reader reader;
|
||||
reader = new SequenceFile.Reader(fs, path, new Configuration());
|
||||
|
||||
if (reader.getCompressionCodec() != null) {
|
||||
printlnWithTimestamp("Compression codec class: " + reader.getCompressionCodec().getClass());
|
||||
} else
|
||||
printlnWithTimestamp("Compression codec class: " + "none");
|
||||
|
||||
BytesWritable keyBsw = new BytesWritable();
|
||||
BytesWritable valBsw = new BytesWritable();
|
||||
|
||||
for (long l=0; l<rows; l++ ) {
|
||||
reader.next(keyBsw, valBsw);
|
||||
totalBytesRead += keyBsw.getSize() + valBsw.getSize();
|
||||
}
|
||||
reader.close();
|
||||
|
||||
//TODO make a tests for other types of SequenceFile reading scenarios
|
||||
|
||||
} else {
|
||||
throw new IOException("File Type not supported.");
|
||||
}
|
||||
|
||||
|
||||
//printlnWithTimestamp("Closing reader");
|
||||
fin.close();
|
||||
stopTime();
|
||||
//printlnWithTimestamp("Finished close");
|
||||
|
||||
printlnWithTimestamp("Finished in " + getIntervalMillis() + "ms");
|
||||
printlnWithTimestamp("Data read: ");
|
||||
printlnWithTimestamp(" rate = " +
|
||||
totalBytesRead / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
|
||||
printlnWithTimestamp(" total = " + totalBytesRead + "B");
|
||||
|
||||
printlnWithTimestamp("File read: ");
|
||||
printlnWithTimestamp(" rate = " +
|
||||
fs.getFileStatus(path).getLen() / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
|
||||
printlnWithTimestamp(" total = " + fs.getFileStatus(path).getLen() + "B");
|
||||
|
||||
//TODO uncomment this for final committing so test files is removed.
|
||||
//fs.delete(path, true);
|
||||
}
|
||||
|
||||
public void testRunComparisons() throws IOException {
|
||||
|
||||
int keyLength = 100; // 100B
|
||||
int valueLength = 5*1024; // 5KB
|
||||
int minBlockSize = 10*1024*1024; // 10MB
|
||||
int rows = 10000;
|
||||
|
||||
System.out.println("****************************** Sequence File *****************************");
|
||||
|
||||
timeWrite("SequenceFile", keyLength, valueLength, "none", "none", rows, null, minBlockSize);
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("SequenceFile", keyLength, valueLength, rows, -1);
|
||||
|
||||
System.out.println("");
|
||||
System.out.println("----------------------");
|
||||
System.out.println("");
|
||||
|
||||
/* DISABLED LZO
|
||||
timeWrite("SequenceFile", keyLength, valueLength, "lzo", rows, null, minBlockSize);
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("SequenceFile", keyLength, valueLength, rows, -1);
|
||||
|
||||
System.out.println("");
|
||||
System.out.println("----------------------");
|
||||
System.out.println("");
|
||||
|
||||
/* Sequence file can only use native hadoop libs gzipping so commenting out.
|
||||
*/
|
||||
try {
|
||||
timeWrite("SequenceFile", keyLength, valueLength, "gz", "none", rows, null,
|
||||
minBlockSize);
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("SequenceFile", keyLength, valueLength, rows, -1);
|
||||
} catch (IllegalArgumentException e) {
|
||||
System.out.println("Skipping sequencefile gz: " + e.getMessage());
|
||||
}
|
||||
|
||||
|
||||
System.out.println("\n\n\n");
|
||||
System.out.println("****************************** HFile *****************************");
|
||||
|
||||
timeWrite("HFile", keyLength, valueLength, "none", "none", rows, null, minBlockSize);
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
||||
|
||||
System.out.println("");
|
||||
System.out.println("----------------------");
|
||||
System.out.println("");
|
||||
|
||||
timeWrite("HFile", keyLength, valueLength, "none", "aes", rows, null, minBlockSize);
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
||||
|
||||
System.out.println("");
|
||||
System.out.println("----------------------");
|
||||
System.out.println("");
|
||||
|
||||
/* DISABLED LZO
|
||||
timeWrite("HFile", keyLength, valueLength, "lzo", rows, null, minBlockSize);
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("HFile", keyLength, valueLength, rows, 1 );
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("HFile", keyLength, valueLength, rows, 2 );
|
||||
|
||||
System.out.println("");
|
||||
System.out.println("----------------------");
|
||||
System.out.println("");
|
||||
*/
|
||||
|
||||
timeWrite("HFile", keyLength, valueLength, "gz", "none", rows, null, minBlockSize);
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
||||
|
||||
System.out.println("");
|
||||
System.out.println("----------------------");
|
||||
System.out.println("");
|
||||
|
||||
timeWrite("HFile", keyLength, valueLength, "gz", "aes", rows, null, minBlockSize);
|
||||
System.out.println("\n+++++++\n");
|
||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
||||
|
||||
System.out.println("\n\n\n\nNotes: ");
|
||||
System.out.println(" * Timing includes open/closing of files.");
|
||||
System.out.println(" * Timing includes reading both Key and Value");
|
||||
System.out.println(" * Data is generated as random bytes. Other methods e.g. using " +
|
||||
"dictionary with care for distributation of words is under development.");
|
||||
System.out.println(" * Timing of write currently, includes random value/key generations. " +
|
||||
"Which is the same for Sequence File and HFile. Another possibility is to generate " +
|
||||
"test data beforehand");
|
||||
System.out.println(" * We need to mitigate cache effect on benchmark. We can apply several " +
|
||||
"ideas, for next step we do a large dummy read between benchmark read to dismantle " +
|
||||
"caching of data. Renaming of file may be helpful. We can have a loop that reads with" +
|
||||
" the same method several times and flood cache every time and average it to get a" +
|
||||
" better number.");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addOptions() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processOptions(CommandLine cmd) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWork() throws Exception {
|
||||
testRunComparisons();
|
||||
return 0;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
int ret = ToolRunner.run(HBaseConfiguration.create(), new TestHFilePerformance(), args);
|
||||
System.exit(ret);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue