diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/InvalidHFileException.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/InvalidHFileException.java new file mode 100644 index 00000000000..87c7e07d7aa --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/InvalidHFileException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.IOException; + +/** + * Thrown when an invalid HFile format is detected + */ +public class InvalidHFileException extends IOException { + private static final long serialVersionUID = 4660352028739861249L; + + /** constructor */ + public InvalidHFileException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public InvalidHFileException(String s) { + super(s); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index d9e3c61f9d2..ade34dd5c96 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; @@ -123,6 +124,7 @@ public class Store extends SchemaConfigured implements HeapSize { private final String storeNameStr; private CompactionProgress progress; private final int compactionKVMax; + private final boolean verifyBulkLoads; // not private for testing /* package */ScanInfo scanInfo; @@ -222,6 +224,9 @@ public class Store extends SchemaConfigured implements HeapSize { = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE); this.compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10); + this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", + false); + if (Store.closeCheckInterval == 0) { Store.closeCheckInterval = conf.getInt( "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); @@ -355,9 +360,8 @@ public class Store extends SchemaConfigured implements HeapSize { } /** - * This throws a WrongRegionException if the bulkHFile does not fit in this - * region. - * + * This throws a WrongRegionException if the HFile does not fit in this + * region, or an InvalidHFileException if the HFile is not valid. */ void assertBulkLoadHFileOk(Path srcPath) throws IOException { HFile.Reader reader = null; @@ -386,6 +390,34 @@ public class Store extends SchemaConfigured implements HeapSize { "Bulk load file " + srcPath.toString() + " does not fit inside region " + this.region); } + + if (verifyBulkLoads) { + KeyValue prevKV = null; + HFileScanner scanner = reader.getScanner(false, false, false); + scanner.seekTo(); + do { + KeyValue kv = scanner.getKeyValue(); + if (prevKV != null) { + if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getRowOffset(), + prevKV.getRowLength(), kv.getBuffer(), kv.getRowOffset(), + kv.getRowLength()) > 0) { + throw new InvalidHFileException("Previous row is greater than" + + " current row: path=" + srcPath + " previous=" + + Bytes.toStringBinary(prevKV.getKey()) + " current=" + + Bytes.toStringBinary(kv.getKey())); + } + if (Bytes.compareTo(prevKV.getBuffer(), prevKV.getFamilyOffset(), + prevKV.getFamilyLength(), kv.getBuffer(), kv.getFamilyOffset(), + kv.getFamilyLength()) != 0) { + throw new InvalidHFileException("Previous key had different" + + " family compared to current key: path=" + srcPath + + " previous=" + Bytes.toStringBinary(prevKV.getFamily()) + + " current=" + Bytes.toStringBinary(kv.getFamily())); + } + } + prevKV = kv; + } while (scanner.next()); + } } finally { if (reader != null) reader.close(); }