HBASE-4944. Optionally verify bulk loaded HFiles
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1210311 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5021d29ef4
commit
d9f39151c5
|
@ -0,0 +1,40 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown when an invalid HFile format is detected
|
||||||
|
*/
|
||||||
|
public class InvalidHFileException extends IOException {
|
||||||
|
private static final long serialVersionUID = 4660352028739861249L;
|
||||||
|
|
||||||
|
/** constructor */
|
||||||
|
public InvalidHFileException() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
* @param s message
|
||||||
|
*/
|
||||||
|
public InvalidHFileException(String s) {
|
||||||
|
super(s);
|
||||||
|
}
|
||||||
|
}
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
|
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||||
|
@ -123,6 +124,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
private final String storeNameStr;
|
private final String storeNameStr;
|
||||||
private CompactionProgress progress;
|
private CompactionProgress progress;
|
||||||
private final int compactionKVMax;
|
private final int compactionKVMax;
|
||||||
|
private final boolean verifyBulkLoads;
|
||||||
|
|
||||||
// not private for testing
|
// not private for testing
|
||||||
/* package */ScanInfo scanInfo;
|
/* package */ScanInfo scanInfo;
|
||||||
|
@ -222,6 +224,9 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
|
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
|
||||||
this.compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10);
|
this.compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10);
|
||||||
|
|
||||||
|
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
|
||||||
|
false);
|
||||||
|
|
||||||
if (Store.closeCheckInterval == 0) {
|
if (Store.closeCheckInterval == 0) {
|
||||||
Store.closeCheckInterval = conf.getInt(
|
Store.closeCheckInterval = conf.getInt(
|
||||||
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
|
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
|
||||||
|
@ -355,9 +360,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This throws a WrongRegionException if the bulkHFile does not fit in this
|
* This throws a WrongRegionException if the HFile does not fit in this
|
||||||
* region.
|
* region, or an InvalidHFileException if the HFile is not valid.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
void assertBulkLoadHFileOk(Path srcPath) throws IOException {
|
void assertBulkLoadHFileOk(Path srcPath) throws IOException {
|
||||||
HFile.Reader reader = null;
|
HFile.Reader reader = null;
|
||||||
|
@ -386,6 +390,34 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
"Bulk load file " + srcPath.toString() + " does not fit inside region "
|
"Bulk load file " + srcPath.toString() + " does not fit inside region "
|
||||||
+ this.region);
|
+ this.region);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (verifyBulkLoads) {
|
||||||
|
KeyValue prevKV = null;
|
||||||
|
HFileScanner scanner = reader.getScanner(false, false, false);
|
||||||
|
scanner.seekTo();
|
||||||
|
do {
|
||||||
|
KeyValue kv = scanner.getKeyValue();
|
||||||
|
if (prevKV != null) {
|
||||||
|
if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getRowOffset(),
|
||||||
|
prevKV.getRowLength(), kv.getBuffer(), kv.getRowOffset(),
|
||||||
|
kv.getRowLength()) > 0) {
|
||||||
|
throw new InvalidHFileException("Previous row is greater than"
|
||||||
|
+ " current row: path=" + srcPath + " previous="
|
||||||
|
+ Bytes.toStringBinary(prevKV.getKey()) + " current="
|
||||||
|
+ Bytes.toStringBinary(kv.getKey()));
|
||||||
|
}
|
||||||
|
if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getFamilyOffset(),
|
||||||
|
prevKV.getFamilyLength(), kv.getBuffer(), kv.getFamilyOffset(),
|
||||||
|
kv.getFamilyLength()) != 0) {
|
||||||
|
throw new InvalidHFileException("Previous key had different"
|
||||||
|
+ " family compared to current key: path=" + srcPath
|
||||||
|
+ " previous=" + Bytes.toStringBinary(prevKV.getFamily())
|
||||||
|
+ " current=" + Bytes.toStringBinary(kv.getFamily()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prevKV = kv;
|
||||||
|
} while (scanner.next());
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (reader != null) reader.close();
|
if (reader != null) reader.close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue