HBASE-9910 TestHFilePerformance and HFilePerformanceEvaluation should be merged in a single HFile performance test class (Vikas Vishwakarma)
Amending-Author: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
e0dbc0b55f
commit
8dd17e1ff8
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.SecureRandom;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -30,6 +31,10 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.aes.AES;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
@ -45,6 +50,15 @@ public class HFilePerformanceEvaluation {
|
||||||
private static final int ROW_LENGTH = 10;
|
private static final int ROW_LENGTH = 10;
|
||||||
private static final int ROW_COUNT = 1000000;
|
private static final int ROW_COUNT = 1000000;
|
||||||
private static final int RFILE_BLOCKSIZE = 8 * 1024;
|
private static final int RFILE_BLOCKSIZE = 8 * 1024;
|
||||||
|
private static StringBuilder testSummary = new StringBuilder();
|
||||||
|
|
||||||
|
// Disable verbose INFO logging from org.apache.hadoop.io.compress.CodecPool
|
||||||
|
static {
|
||||||
|
System.setProperty("org.apache.commons.logging.Log",
|
||||||
|
"org.apache.commons.logging.impl.SimpleLog");
|
||||||
|
System.setProperty("org.apache.commons.logging.simplelog.log.org.apache.hadoop.io.compress.CodecPool",
|
||||||
|
"WARN");
|
||||||
|
}
|
||||||
|
|
||||||
static final Log LOG =
|
static final Log LOG =
|
||||||
LogFactory.getLog(HFilePerformanceEvaluation.class.getName());
|
LogFactory.getLog(HFilePerformanceEvaluation.class.getName());
|
||||||
|
@ -82,56 +96,131 @@ public class HFilePerformanceEvaluation {
|
||||||
return CellUtil.createCell(keyRow, value);
|
return CellUtil.createCell(keyRow, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add any supported codec or cipher to test the HFile read/write performance.
|
||||||
|
* Specify "none" to disable codec or cipher or both.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
private void runBenchmarks() throws Exception {
|
private void runBenchmarks() throws Exception {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
final FileSystem fs = FileSystem.get(conf);
|
final FileSystem fs = FileSystem.get(conf);
|
||||||
final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
|
final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
|
||||||
|
|
||||||
|
// codec=none cipher=none
|
||||||
|
runWriteBenchmark(conf, fs, mf, "none", "none");
|
||||||
|
runReadBenchmark(conf, fs, mf, "none", "none");
|
||||||
|
|
||||||
|
// codec=gz cipher=none
|
||||||
|
runWriteBenchmark(conf, fs, mf, "gz", "none");
|
||||||
|
runReadBenchmark(conf, fs, mf, "gz", "none");
|
||||||
|
|
||||||
|
// Add configuration for AES cipher
|
||||||
|
final Configuration aesconf = new Configuration();
|
||||||
|
aesconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
||||||
|
aesconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||||
|
aesconf.setInt("hfile.format.version", 3);
|
||||||
|
final FileSystem aesfs = FileSystem.get(aesconf);
|
||||||
|
final Path aesmf = aesfs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
|
||||||
|
|
||||||
|
// codec=none cipher=aes
|
||||||
|
runWriteBenchmark(aesconf, aesfs, aesmf, "none", "aes");
|
||||||
|
runReadBenchmark(aesconf, aesfs, aesmf, "none", "aes");
|
||||||
|
|
||||||
|
// codec=gz cipher=aes
|
||||||
|
runWriteBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
|
||||||
|
runReadBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
|
||||||
|
|
||||||
|
// cleanup test files
|
||||||
|
if (fs.exists(mf)) {
|
||||||
|
fs.delete(mf, true);
|
||||||
|
}
|
||||||
|
if (aesfs.exists(aesmf)) {
|
||||||
|
aesfs.delete(aesmf, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print Result Summary
|
||||||
|
LOG.info("\n***************\n" + "Result Summary" + "\n***************\n");
|
||||||
|
LOG.info(testSummary.toString());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write a test HFile with the given codec & cipher
|
||||||
|
* @param conf
|
||||||
|
* @param fs
|
||||||
|
* @param mf
|
||||||
|
* @param codec "none", "lzo", "gz", "snappy"
|
||||||
|
* @param cipher "none", "aes"
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private void runWriteBenchmark(Configuration conf, FileSystem fs, Path mf, String codec,
|
||||||
|
String cipher) throws Exception {
|
||||||
if (fs.exists(mf)) {
|
if (fs.exists(mf)) {
|
||||||
fs.delete(mf, true);
|
fs.delete(mf, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT),
|
runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT, codec, cipher),
|
||||||
ROW_COUNT);
|
ROW_COUNT, codec, cipher);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run all the read benchmarks for the test HFile
|
||||||
|
* @param conf
|
||||||
|
* @param fs
|
||||||
|
* @param mf
|
||||||
|
* @param codec "none", "lzo", "gz", "snappy"
|
||||||
|
* @param cipher "none", "aes"
|
||||||
|
*/
|
||||||
|
private void runReadBenchmark(final Configuration conf, final FileSystem fs, final Path mf,
|
||||||
|
final String codec, final String cipher) {
|
||||||
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
|
runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
|
||||||
ROW_COUNT);
|
ROW_COUNT, codec, cipher);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
testSummary.append("UniformRandomSmallScan failed " + e.getMessage());
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
|
runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
|
||||||
ROW_COUNT);
|
ROW_COUNT, codec, cipher);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
testSummary.append("UniformRandomReadBenchmark failed " + e.getMessage());
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
|
runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
|
||||||
ROW_COUNT);
|
ROW_COUNT, codec, cipher);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
testSummary.append("GaussianRandomReadBenchmark failed " + e.getMessage());
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
PerformanceEvaluationCommons.concurrentReads(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
|
runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
|
||||||
ROW_COUNT);
|
ROW_COUNT, codec, cipher);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
testSummary.append("SequentialReadBenchmark failed " + e.getMessage());
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -139,13 +228,22 @@ public class HFilePerformanceEvaluation {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount)
|
protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount,
|
||||||
throws Exception {
|
String codec, String cipher) throws Exception {
|
||||||
LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
|
LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" +
|
||||||
rowCount + " rows.");
|
codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows.");
|
||||||
|
|
||||||
long elapsedTime = benchmark.run();
|
long elapsedTime = benchmark.run();
|
||||||
LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
|
|
||||||
rowCount + " rows took " + elapsedTime + "ms.");
|
LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" +
|
||||||
|
codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows took " +
|
||||||
|
elapsedTime + "ms.");
|
||||||
|
|
||||||
|
// Store results to print summary at the end
|
||||||
|
testSummary.append("Running ").append(benchmark.getClass().getSimpleName())
|
||||||
|
.append(" with codec[").append(codec).append("] cipher[").append(cipher)
|
||||||
|
.append("] for ").append(rowCount).append(" rows took ").append(elapsedTime)
|
||||||
|
.append("ms.").append("\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
static abstract class RowOrientedBenchmark {
|
static abstract class RowOrientedBenchmark {
|
||||||
|
@ -154,6 +252,18 @@ public class HFilePerformanceEvaluation {
|
||||||
protected final FileSystem fs;
|
protected final FileSystem fs;
|
||||||
protected final Path mf;
|
protected final Path mf;
|
||||||
protected final int totalRows;
|
protected final int totalRows;
|
||||||
|
protected String codec = "none";
|
||||||
|
protected String cipher = "none";
|
||||||
|
|
||||||
|
public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
|
||||||
|
int totalRows, String codec, String cipher) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.fs = fs;
|
||||||
|
this.mf = mf;
|
||||||
|
this.totalRows = totalRows;
|
||||||
|
this.codec = codec;
|
||||||
|
this.cipher = cipher;
|
||||||
|
}
|
||||||
|
|
||||||
public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
|
public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
|
||||||
int totalRows) {
|
int totalRows) {
|
||||||
|
@ -208,19 +318,34 @@ public class HFilePerformanceEvaluation {
|
||||||
private byte[] bytes = new byte[ROW_LENGTH];
|
private byte[] bytes = new byte[ROW_LENGTH];
|
||||||
|
|
||||||
public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
|
public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
|
||||||
int totalRows) {
|
int totalRows, String codec, String cipher) {
|
||||||
super(conf, fs, mf, totalRows);
|
super(conf, fs, mf, totalRows, codec, cipher);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void setUp() throws Exception {
|
void setUp() throws Exception {
|
||||||
HFileContext hFileContext = new HFileContextBuilder().withBlockSize(RFILE_BLOCKSIZE).build();
|
|
||||||
writer =
|
HFileContextBuilder builder = new HFileContextBuilder()
|
||||||
HFile.getWriterFactoryNoCache(conf)
|
.withCompression(AbstractHFileWriter.compressionByName(codec))
|
||||||
.withPath(fs, mf)
|
.withBlockSize(RFILE_BLOCKSIZE);
|
||||||
.withFileContext(hFileContext)
|
|
||||||
.withComparator(new KeyValue.RawBytesComparator())
|
if (cipher == "aes") {
|
||||||
.create();
|
byte[] cipherKey = new byte[AES.KEY_LENGTH];
|
||||||
|
new SecureRandom().nextBytes(cipherKey);
|
||||||
|
builder.withEncryptionContext(Encryption.newContext(conf)
|
||||||
|
.setCipher(Encryption.getCipher(conf, cipher))
|
||||||
|
.setKey(cipherKey));
|
||||||
|
} else if (!"none".equals(cipher)) {
|
||||||
|
throw new IOException("Cipher " + cipher + " not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
HFileContext hFileContext = builder.build();
|
||||||
|
|
||||||
|
writer = HFile.getWriterFactoryNoCache(conf)
|
||||||
|
.withPath(fs, mf)
|
||||||
|
.withFileContext(hFileContext)
|
||||||
|
.withComparator(new KeyValue.RawBytesComparator())
|
||||||
|
.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,455 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.io.hfile;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.security.SecureRandom;
|
|
||||||
import java.text.DateFormat;
|
|
||||||
import java.text.SimpleDateFormat;
|
|
||||||
import java.util.Random;
|
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
|
||||||
import org.apache.hadoop.hbase.io.crypto.aes.AES;
|
|
||||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
|
||||||
import org.apache.hadoop.io.BytesWritable;
|
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set of long-running tests to measure performance of HFile.
|
|
||||||
* <p>
|
|
||||||
* Copied from
|
|
||||||
* <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
|
|
||||||
* Remove after tfile is committed and use the tfile version of this class
|
|
||||||
* instead.</p>
|
|
||||||
*/
|
|
||||||
public class TestHFilePerformance extends AbstractHBaseTool {
|
|
||||||
private HBaseTestingUtility TEST_UTIL;
|
|
||||||
private static String ROOT_DIR;
|
|
||||||
private FileSystem fs;
|
|
||||||
private long startTimeEpoch;
|
|
||||||
private long finishTimeEpoch;
|
|
||||||
private DateFormat formatter;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setConf(Configuration conf) {
|
|
||||||
super.setConf(conf);
|
|
||||||
try {
|
|
||||||
fs = FileSystem.get(conf);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
|
|
||||||
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
|
||||||
formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
||||||
TEST_UTIL = new HBaseTestingUtility(conf);
|
|
||||||
ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFilePerformance").toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void startTime() {
|
|
||||||
startTimeEpoch = System.currentTimeMillis();
|
|
||||||
System.out.println(formatTime() + " Started timing.");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stopTime() {
|
|
||||||
finishTimeEpoch = System.currentTimeMillis();
|
|
||||||
System.out.println(formatTime() + " Stopped timing.");
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getIntervalMillis() {
|
|
||||||
return finishTimeEpoch - startTimeEpoch;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void printlnWithTimestamp(String message) {
|
|
||||||
System.out.println(formatTime() + " " + message);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Format millis into minutes and seconds.
|
|
||||||
*/
|
|
||||||
public String formatTime(long milis){
|
|
||||||
return formatter.format(milis);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String formatTime(){
|
|
||||||
return formatTime(System.currentTimeMillis());
|
|
||||||
}
|
|
||||||
|
|
||||||
private FSDataOutputStream createFSOutput(Path name) throws IOException {
|
|
||||||
if (fs.exists(name))
|
|
||||||
fs.delete(name, true);
|
|
||||||
FSDataOutputStream fout = fs.create(name);
|
|
||||||
return fout;
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO have multiple ways of generating key/value e.g. dictionary words
|
|
||||||
//TODO to have a sample compressable data, for now, made 1 out of 3 values random
|
|
||||||
// keys are all random.
|
|
||||||
|
|
||||||
private static class KeyValueGenerator {
|
|
||||||
Random keyRandomizer;
|
|
||||||
Random valueRandomizer;
|
|
||||||
long randomValueRatio = 3; // 1 out of randomValueRatio generated values will be random.
|
|
||||||
long valueSequence = 0 ;
|
|
||||||
|
|
||||||
|
|
||||||
KeyValueGenerator() {
|
|
||||||
keyRandomizer = new Random(0L); //TODO with seed zero
|
|
||||||
valueRandomizer = new Random(1L); //TODO with seed one
|
|
||||||
}
|
|
||||||
|
|
||||||
// Key is always random now.
|
|
||||||
void getKey(byte[] key) {
|
|
||||||
keyRandomizer.nextBytes(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
void getValue(byte[] value) {
|
|
||||||
if (valueSequence % randomValueRatio == 0)
|
|
||||||
valueRandomizer.nextBytes(value);
|
|
||||||
valueSequence++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
* @param fileType "HFile" or "SequenceFile"
|
|
||||||
* @param keyLength
|
|
||||||
* @param valueLength
|
|
||||||
* @param codecName "none", "lzo", "gz", "snappy"
|
|
||||||
* @param cipherName "none", "aes"
|
|
||||||
* @param rows number of rows to be written.
|
|
||||||
* @param writeMethod used for HFile only.
|
|
||||||
* @param minBlockSize used for HFile only.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
//TODO writeMethod: implement multiple ways of writing e.g. A) known length (no chunk) B) using a buffer and streaming (for many chunks).
|
|
||||||
public void timeWrite(String fileType, int keyLength, int valueLength,
|
|
||||||
String codecName, String cipherName, long rows, String writeMethod, int minBlockSize)
|
|
||||||
throws IOException {
|
|
||||||
System.out.println("File Type: " + fileType);
|
|
||||||
System.out.println("Writing " + fileType + " with codecName: " + codecName +
|
|
||||||
" cipherName: " + cipherName);
|
|
||||||
long totalBytesWritten = 0;
|
|
||||||
|
|
||||||
|
|
||||||
//Using separate randomizer for key/value with seeds matching Sequence File.
|
|
||||||
byte[] key = new byte[keyLength];
|
|
||||||
byte[] value = new byte[valueLength];
|
|
||||||
KeyValueGenerator generator = new KeyValueGenerator();
|
|
||||||
|
|
||||||
startTime();
|
|
||||||
|
|
||||||
Path path = new Path(ROOT_DIR, fileType + ".Performance");
|
|
||||||
System.out.println(ROOT_DIR + Path.SEPARATOR + path.getName());
|
|
||||||
FSDataOutputStream fout = createFSOutput(path);
|
|
||||||
|
|
||||||
if ("HFile".equals(fileType)){
|
|
||||||
HFileContextBuilder builder = new HFileContextBuilder()
|
|
||||||
.withCompression(AbstractHFileWriter.compressionByName(codecName))
|
|
||||||
.withBlockSize(minBlockSize);
|
|
||||||
if (cipherName != "none") {
|
|
||||||
byte[] cipherKey = new byte[AES.KEY_LENGTH];
|
|
||||||
new SecureRandom().nextBytes(cipherKey);
|
|
||||||
builder.withEncryptionContext(
|
|
||||||
Encryption.newContext(conf)
|
|
||||||
.setCipher(Encryption.getCipher(conf, cipherName))
|
|
||||||
.setKey(cipherKey));
|
|
||||||
}
|
|
||||||
HFileContext context = builder.build();
|
|
||||||
System.out.println("HFile write method: ");
|
|
||||||
HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
|
|
||||||
.withOutputStream(fout)
|
|
||||||
.withFileContext(context)
|
|
||||||
.withComparator(new KeyValue.RawBytesComparator())
|
|
||||||
.create();
|
|
||||||
|
|
||||||
// Writing value in one shot.
|
|
||||||
for (long l=0; l<rows; l++ ) {
|
|
||||||
generator.getKey(key);
|
|
||||||
generator.getValue(value);
|
|
||||||
writer.append(CellUtil.createCell(key, value));
|
|
||||||
totalBytesWritten += key.length;
|
|
||||||
totalBytesWritten += value.length;
|
|
||||||
}
|
|
||||||
writer.close();
|
|
||||||
} else if ("SequenceFile".equals(fileType)){
|
|
||||||
CompressionCodec codec = null;
|
|
||||||
if ("gz".equals(codecName))
|
|
||||||
codec = new GzipCodec();
|
|
||||||
else if (!"none".equals(codecName))
|
|
||||||
throw new IOException("Codec not supported.");
|
|
||||||
|
|
||||||
SequenceFile.Writer writer;
|
|
||||||
|
|
||||||
//TODO
|
|
||||||
//JobConf conf = new JobConf();
|
|
||||||
|
|
||||||
if (!"none".equals(codecName))
|
|
||||||
writer = SequenceFile.createWriter(conf, fout, BytesWritable.class,
|
|
||||||
BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec);
|
|
||||||
else
|
|
||||||
writer = SequenceFile.createWriter(conf, fout, BytesWritable.class,
|
|
||||||
BytesWritable.class, SequenceFile.CompressionType.NONE, null);
|
|
||||||
|
|
||||||
BytesWritable keyBsw;
|
|
||||||
BytesWritable valBsw;
|
|
||||||
for (long l=0; l<rows; l++ ) {
|
|
||||||
|
|
||||||
generator.getKey(key);
|
|
||||||
keyBsw = new BytesWritable(key);
|
|
||||||
totalBytesWritten += keyBsw.getSize();
|
|
||||||
|
|
||||||
generator.getValue(value);
|
|
||||||
valBsw = new BytesWritable(value);
|
|
||||||
writer.append(keyBsw, valBsw);
|
|
||||||
totalBytesWritten += valBsw.getSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
writer.close();
|
|
||||||
} else
|
|
||||||
throw new IOException("File Type is not supported");
|
|
||||||
|
|
||||||
fout.close();
|
|
||||||
stopTime();
|
|
||||||
|
|
||||||
printlnWithTimestamp("Data written: ");
|
|
||||||
printlnWithTimestamp(" rate = " +
|
|
||||||
totalBytesWritten / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
|
|
||||||
printlnWithTimestamp(" total = " + totalBytesWritten + "B");
|
|
||||||
|
|
||||||
printlnWithTimestamp("File written: ");
|
|
||||||
printlnWithTimestamp(" rate = " +
|
|
||||||
fs.getFileStatus(path).getLen() / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
|
|
||||||
printlnWithTimestamp(" total = " + fs.getFileStatus(path).getLen() + "B");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void timeReading(String fileType, int keyLength, int valueLength,
|
|
||||||
long rows, int method) throws IOException {
|
|
||||||
System.out.println("Reading file of type: " + fileType);
|
|
||||||
Path path = new Path(ROOT_DIR, fileType + ".Performance");
|
|
||||||
System.out.println("Input file size: " + fs.getFileStatus(path).getLen());
|
|
||||||
long totalBytesRead = 0;
|
|
||||||
|
|
||||||
|
|
||||||
ByteBuffer val;
|
|
||||||
|
|
||||||
ByteBuffer key;
|
|
||||||
|
|
||||||
startTime();
|
|
||||||
FSDataInputStream fin = fs.open(path);
|
|
||||||
|
|
||||||
if ("HFile".equals(fileType)){
|
|
||||||
HFile.Reader reader = HFile.createReaderFromStream(path, fs.open(path),
|
|
||||||
fs.getFileStatus(path).getLen(), new CacheConfig(conf), conf);
|
|
||||||
reader.loadFileInfo();
|
|
||||||
switch (method) {
|
|
||||||
|
|
||||||
case 0:
|
|
||||||
case 1:
|
|
||||||
default:
|
|
||||||
{
|
|
||||||
HFileScanner scanner = reader.getScanner(false, false);
|
|
||||||
scanner.seekTo();
|
|
||||||
for (long l=0; l<rows; l++ ) {
|
|
||||||
key = scanner.getKey();
|
|
||||||
val = scanner.getValue();
|
|
||||||
totalBytesRead += key.limit() + val.limit();
|
|
||||||
scanner.next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
reader.close();
|
|
||||||
} else if("SequenceFile".equals(fileType)){
|
|
||||||
|
|
||||||
SequenceFile.Reader reader;
|
|
||||||
reader = new SequenceFile.Reader(fs, path, new Configuration());
|
|
||||||
|
|
||||||
if (reader.getCompressionCodec() != null) {
|
|
||||||
printlnWithTimestamp("Compression codec class: " + reader.getCompressionCodec().getClass());
|
|
||||||
} else
|
|
||||||
printlnWithTimestamp("Compression codec class: " + "none");
|
|
||||||
|
|
||||||
BytesWritable keyBsw = new BytesWritable();
|
|
||||||
BytesWritable valBsw = new BytesWritable();
|
|
||||||
|
|
||||||
for (long l=0; l<rows; l++ ) {
|
|
||||||
reader.next(keyBsw, valBsw);
|
|
||||||
totalBytesRead += keyBsw.getSize() + valBsw.getSize();
|
|
||||||
}
|
|
||||||
reader.close();
|
|
||||||
|
|
||||||
//TODO make a tests for other types of SequenceFile reading scenarios
|
|
||||||
|
|
||||||
} else {
|
|
||||||
throw new IOException("File Type not supported.");
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
//printlnWithTimestamp("Closing reader");
|
|
||||||
fin.close();
|
|
||||||
stopTime();
|
|
||||||
//printlnWithTimestamp("Finished close");
|
|
||||||
|
|
||||||
printlnWithTimestamp("Finished in " + getIntervalMillis() + "ms");
|
|
||||||
printlnWithTimestamp("Data read: ");
|
|
||||||
printlnWithTimestamp(" rate = " +
|
|
||||||
totalBytesRead / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
|
|
||||||
printlnWithTimestamp(" total = " + totalBytesRead + "B");
|
|
||||||
|
|
||||||
printlnWithTimestamp("File read: ");
|
|
||||||
printlnWithTimestamp(" rate = " +
|
|
||||||
fs.getFileStatus(path).getLen() / getIntervalMillis() * 1000 / 1024 / 1024 + "MB/s");
|
|
||||||
printlnWithTimestamp(" total = " + fs.getFileStatus(path).getLen() + "B");
|
|
||||||
|
|
||||||
//TODO uncomment this for final committing so test files is removed.
|
|
||||||
//fs.delete(path, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testRunComparisons() throws IOException {
|
|
||||||
|
|
||||||
int keyLength = 100; // 100B
|
|
||||||
int valueLength = 5*1024; // 5KB
|
|
||||||
int minBlockSize = 10*1024*1024; // 10MB
|
|
||||||
int rows = 10000;
|
|
||||||
|
|
||||||
System.out.println("****************************** Sequence File *****************************");
|
|
||||||
|
|
||||||
timeWrite("SequenceFile", keyLength, valueLength, "none", "none", rows, null, minBlockSize);
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("SequenceFile", keyLength, valueLength, rows, -1);
|
|
||||||
|
|
||||||
System.out.println("");
|
|
||||||
System.out.println("----------------------");
|
|
||||||
System.out.println("");
|
|
||||||
|
|
||||||
/* DISABLED LZO
|
|
||||||
timeWrite("SequenceFile", keyLength, valueLength, "lzo", rows, null, minBlockSize);
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("SequenceFile", keyLength, valueLength, rows, -1);
|
|
||||||
|
|
||||||
System.out.println("");
|
|
||||||
System.out.println("----------------------");
|
|
||||||
System.out.println("");
|
|
||||||
|
|
||||||
/* Sequence file can only use native hadoop libs gzipping so commenting out.
|
|
||||||
*/
|
|
||||||
try {
|
|
||||||
timeWrite("SequenceFile", keyLength, valueLength, "gz", "none", rows, null,
|
|
||||||
minBlockSize);
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("SequenceFile", keyLength, valueLength, rows, -1);
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
System.out.println("Skipping sequencefile gz: " + e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
System.out.println("\n\n\n");
|
|
||||||
System.out.println("****************************** HFile *****************************");
|
|
||||||
|
|
||||||
timeWrite("HFile", keyLength, valueLength, "none", "none", rows, null, minBlockSize);
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
|
||||||
|
|
||||||
System.out.println("");
|
|
||||||
System.out.println("----------------------");
|
|
||||||
System.out.println("");
|
|
||||||
|
|
||||||
timeWrite("HFile", keyLength, valueLength, "none", "aes", rows, null, minBlockSize);
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
|
||||||
|
|
||||||
System.out.println("");
|
|
||||||
System.out.println("----------------------");
|
|
||||||
System.out.println("");
|
|
||||||
|
|
||||||
/* DISABLED LZO
|
|
||||||
timeWrite("HFile", keyLength, valueLength, "lzo", rows, null, minBlockSize);
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("HFile", keyLength, valueLength, rows, 1 );
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("HFile", keyLength, valueLength, rows, 2 );
|
|
||||||
|
|
||||||
System.out.println("");
|
|
||||||
System.out.println("----------------------");
|
|
||||||
System.out.println("");
|
|
||||||
*/
|
|
||||||
|
|
||||||
timeWrite("HFile", keyLength, valueLength, "gz", "none", rows, null, minBlockSize);
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
|
||||||
|
|
||||||
System.out.println("");
|
|
||||||
System.out.println("----------------------");
|
|
||||||
System.out.println("");
|
|
||||||
|
|
||||||
timeWrite("HFile", keyLength, valueLength, "gz", "aes", rows, null, minBlockSize);
|
|
||||||
System.out.println("\n+++++++\n");
|
|
||||||
timeReading("HFile", keyLength, valueLength, rows, 0 );
|
|
||||||
|
|
||||||
System.out.println("\n\n\n\nNotes: ");
|
|
||||||
System.out.println(" * Timing includes open/closing of files.");
|
|
||||||
System.out.println(" * Timing includes reading both Key and Value");
|
|
||||||
System.out.println(" * Data is generated as random bytes. Other methods e.g. using " +
|
|
||||||
"dictionary with care for distributation of words is under development.");
|
|
||||||
System.out.println(" * Timing of write currently, includes random value/key generations. " +
|
|
||||||
"Which is the same for Sequence File and HFile. Another possibility is to generate " +
|
|
||||||
"test data beforehand");
|
|
||||||
System.out.println(" * We need to mitigate cache effect on benchmark. We can apply several " +
|
|
||||||
"ideas, for next step we do a large dummy read between benchmark read to dismantle " +
|
|
||||||
"caching of data. Renaming of file may be helpful. We can have a loop that reads with" +
|
|
||||||
" the same method several times and flood cache every time and average it to get a" +
|
|
||||||
" better number.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void addOptions() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void processOptions(CommandLine cmd) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int doWork() throws Exception {
|
|
||||||
testRunComparisons();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
int ret = ToolRunner.run(HBaseConfiguration.create(), new TestHFilePerformance(), args);
|
|
||||||
System.exit(ret);
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue