diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index fe9745eeea5..1892f5467ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -227,6 +227,7 @@ public class Scan extends Query { filter = scan.getFilter(); // clone? loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue(); consistency = scan.getConsistency(); + this.setIsolationLevel(scan.getIsolationLevel()); reversed = scan.isReversed(); asyncPrefetch = scan.isAsyncPrefetch(); small = scan.isSmall(); @@ -271,6 +272,7 @@ public class Scan extends Query { this.getScan = true; this.asyncPrefetch = false; this.consistency = get.getConsistency(); + this.setIsolationLevel(get.getIsolationLevel()); for (Map.Entry attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 7db1c76797d..7242791d805 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -842,7 +842,7 @@ public final class CellUtil { final int tagsLength = cell.getTagsLength(); // Save an object allocation where we can if (tagsLength == 0) { - return EMPTY_TAGS_ITR; + return TagUtil.EMPTY_TAGS_ITR; } if (cell instanceof ByteBufferedCell) { return tagsIterator(((ByteBufferedCell) cell).getTagsByteBuffer(), @@ -1388,7 +1388,7 @@ public final class CellUtil { /** * Compares the row of two keyvalues for equality - * + * * @param left * @param right * @return True if rows match. @@ -2307,4 +2307,4 @@ public final class CellUtil { return Type.DeleteFamily.getCode(); } } -} \ No newline at end of file +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 1b71cb453e7..4e07e6abd59 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1012,11 +1012,6 @@ public final class HConstants { public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop"; - /** - * The byte array represents for NO_NEXT_INDEXED_KEY; - * The actual value is irrelevant because this is always compared by reference. - */ - public static final Cell NO_NEXT_INDEXED_KEY = new KeyValue(); /** delimiter used between portions of a region name */ public static final int DELIMITER = ','; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java index 15ddfc8c84c..65f0cad71dc 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -104,7 +105,7 @@ public final class TagUtil { * @return the serialized tag data as bytes */ public static byte[] fromList(List tags) { - if (tags.isEmpty()) { + if (tags == null || tags.isEmpty()) { return HConstants.EMPTY_BYTE_ARRAY; } int length = 0; @@ -216,4 +217,70 @@ public final class TagUtil { } return StreamUtils.readRawVarint32(tag.getValueByteBuffer(), offset); } + + /** + * @return A List<Tag> of any Tags found in cell else null. + */ + public static List carryForwardTags(final Cell cell) { + return carryForwardTags(null, cell); + } + + /** + * Add to tagsOrNull any Tags cell is carrying or null if none. + */ + public static List carryForwardTags(final List tagsOrNull, final Cell cell) { + Iterator itr = CellUtil.tagsIterator(cell); + if (itr == EMPTY_TAGS_ITR) { + // If no Tags, return early. + return tagsOrNull; + } + List tags = tagsOrNull; + if (tags == null) { + tags = new ArrayList(); + } + while (itr.hasNext()) { + tags.add(itr.next()); + } + return tags; + } + + /** + * @return Carry forward the TTL tag. + */ + public static List carryForwardTTLTag(final List tagsOrNull, final long ttl) { + if (ttl == Long.MAX_VALUE) { + return tagsOrNull; + } + List tags = tagsOrNull; + // If we are making the array in here, given we are the last thing checked, we'll be only thing + // in the array so set its size to '1' (I saw this being done in earlier version of + // tag-handling). + if (tags == null) { + tags = new ArrayList(1); + } + tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + return tags; + } + + /** + * Iterator returned when no Tags. Used by CellUtil too. + */ + static final Iterator EMPTY_TAGS_ITR = new Iterator() { + @Override + public boolean hasNext() { + return false; + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IT_NO_SUCH_ELEMENT", + justification="Intentional") + public Tag next() { + return null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; } \ No newline at end of file diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index 420799f2f6d..48d7a5549ce 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -33,10 +33,9 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; @@ -232,6 +231,6 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver { return null; } return new StoreScanner(store, scanInfo, scan, targetCols, - ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); + ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 1bdba3bdc6d..9f29f9738df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -34,18 +34,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue; -import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ObjectIntPair; @@ -289,7 +289,7 @@ public class HFileBlockIndex { if (rootLevelIndex < blockKeys.length - 1) { nextIndexedKey = blockKeys[rootLevelIndex + 1]; } else { - nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY; + nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY; } int lookupLevel = 1; // How many levels deep we are in our lookup. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 4db26d1d2a8..a87328054c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -788,7 +789,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } else { // The comparison with no_next_index_key has to be checked if (this.nextIndexedKey != null && - (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader + (this.nextIndexedKey == KeyValueScanner.NO_NEXT_INDEXED_KEY || reader .getComparator().compareKeyIgnoresMvcc(key, nextIndexedKey) < 0)) { // The reader shall continue to scan the current data block instead // of querying the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 922174da26b..a9c64a3f928 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -57,7 +57,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -67,10 +66,8 @@ import javax.security.sasl.SaslServer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CallQueueTooBigException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -79,13 +76,15 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; -import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -101,11 +100,11 @@ import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.SaslStatus; import org.apache.hadoop.hbase.security.SaslUtil; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; @@ -126,8 +125,8 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; -import org.codehaus.jackson.map.ObjectMapper; import org.apache.htrace.TraceInfo; +import org.codehaus.jackson.map.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; @@ -1918,11 +1917,21 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { data = null; if (!channel.isOpen()) return; - try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE + try {socket.shutdownOutput();} catch(Exception ignored) { + if (LOG.isTraceEnabled()) { + LOG.trace("Ignored exception", ignored); + } + } if (channel.isOpen()) { try {channel.close();} catch(Exception ignored) {} } - try {socket.close();} catch(Exception ignored) {} + try { + socket.close(); + } catch(Exception ignored) { + if (LOG.isTraceEnabled()) { + LOG.trace("Ignored exception", ignored); + } + } } private UserGroupInformation createUser(ConnectionHeader head) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java index 0675b73b02f..00f197ce6b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java @@ -120,39 +120,40 @@ public class CellCounter extends Configured implements Tool { try { context.getCounter(Counters.ROWS).increment(1); context.write(new Text("Total ROWS"), new IntWritable(1)); - - for (Cell value : values.listCells()) { - currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value)); - String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value)); - if (!thisRowFamilyName.equals(currentFamilyName)) { - currentFamilyName = thisRowFamilyName; - context.getCounter("CF", thisRowFamilyName).increment(1); - if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) { - context.write(new Text("Total Families Across all Rows"), new IntWritable(1)); - context.write(new Text(thisRowFamilyName), new IntWritable(1)); + if (values != null && !values.isEmpty()) { + for (Cell value : values.listCells()) { + currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value)); + String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value)); + if (!thisRowFamilyName.equals(currentFamilyName)) { + currentFamilyName = thisRowFamilyName; + context.getCounter("CF", thisRowFamilyName).increment(1); + if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) { + context.write(new Text("Total Families Across all Rows"), new IntWritable(1)); + context.write(new Text(thisRowFamilyName), new IntWritable(1)); + } } - } - String thisRowQualifierName = thisRowFamilyName + separator - + Bytes.toStringBinary(CellUtil.cloneQualifier(value)); - if (!thisRowQualifierName.equals(currentQualifierName)) { - currentQualifierName = thisRowQualifierName; - context.getCounter("CFQL", thisRowQualifierName).increment(1); - context.write(new Text("Total Qualifiers across all Rows"), - new IntWritable(1)); - context.write(new Text(thisRowQualifierName), new IntWritable(1)); - // Intialize versions - context.getCounter("QL_VERSIONS", currentRowKey + separator + - thisRowQualifierName).increment(1); - context.write(new Text(currentRowKey + separator - + thisRowQualifierName + "_Versions"), new IntWritable(1)); + String thisRowQualifierName = thisRowFamilyName + separator + + Bytes.toStringBinary(CellUtil.cloneQualifier(value)); + if (!thisRowQualifierName.equals(currentQualifierName)) { + currentQualifierName = thisRowQualifierName; + context.getCounter("CFQL", thisRowQualifierName).increment(1); + context.write(new Text("Total Qualifiers across all Rows"), + new IntWritable(1)); + context.write(new Text(thisRowQualifierName), new IntWritable(1)); + // Intialize versions + context.getCounter("QL_VERSIONS", currentRowKey + separator + + thisRowQualifierName).increment(1); + context.write(new Text(currentRowKey + separator + + thisRowQualifierName + "_Versions"), new IntWritable(1)); - } else { - // Increment versions - currentQualifierName = thisRowQualifierName; - context.getCounter("QL_VERSIONS", currentRowKey + separator + - thisRowQualifierName).increment(1); - context.write(new Text(currentRowKey + separator - + thisRowQualifierName + "_Versions"), new IntWritable(1)); + } else { + // Increment versions + currentQualifierName = thisRowQualifierName; + context.getCounter("QL_VERSIONS", currentRowKey + separator + + thisRowQualifierName).increment(1); + context.write(new Text(currentRowKey + separator + + thisRowQualifierName + "_Versions"), new IntWritable(1)); + } } } } catch (InterruptedException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index b455828bb66..76390045866 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -1798,7 +1798,8 @@ public class AssignmentManager { invokeUnAssign(regionInfo); break; default: - // No process for other states + // No process for other states + break; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 89ae0d124f5..2984754cdf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -454,10 +454,6 @@ public class DefaultMemStore implements MemStore { * value for that row/family/qualifier. If a KeyValue did already exist, * it will then be removed. *

- * Currently the memstoreTS is kept at 0 so as each insert happens, it will - * be immediately visible. May want to change this so it is atomic across - * all KeyValues. - *

* This is called under row lock, so Get operations will still see updates * atomically. Scans will only see each KeyValue update as atomic. * @@ -484,8 +480,7 @@ public class DefaultMemStore implements MemStore { * family, and qualifier, they are removed. *

* Callers must hold the read lock. - * - * @param cell + * @param readpoint Smallest outstanding readpoint; below which we can remove duplicate Cells. * @return change in size of MemStore */ private long upsert(Cell cell, long readpoint) { @@ -505,7 +500,7 @@ public class DefaultMemStore implements MemStore { cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); SortedSet ss = cellSet.tailSet(firstCell); Iterator it = ss.iterator(); - // versions visible to oldest scanner + // Versions visible to oldest scanner. int versionsVisible = 0; while ( it.hasNext() ) { Cell cur = it.next(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 05ef2ad1868..c93123cf8ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1263,28 +1263,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - public MultiVersionConcurrencyControl getMVCC() { - return mvcc; - } + @VisibleForTesting + public MultiVersionConcurrencyControl getMVCC() { + return mvcc; + } - @Override - public long getMaxFlushedSeqId() { - return maxFlushedSeqId; - } + @Override + public long getMaxFlushedSeqId() { + return maxFlushedSeqId; + } - @Override - public long getReadpoint(IsolationLevel isolationLevel) { - if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { - // This scan can read even uncommitted transactions - return Long.MAX_VALUE; - } - return mvcc.getReadPoint(); - } + @Override + public long getReadPoint(IsolationLevel isolationLevel) { + if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { + // This scan can read even uncommitted transactions + return Long.MAX_VALUE; + } + return mvcc.getReadPoint(); + } - @Override - public boolean isLoadingCfsOnDemandDefault() { - return this.isLoadingCfsOnDemandDefault; - } + @Override + public long getReadpoint(IsolationLevel isolationLevel) { + return getReadPoint(isolationLevel); + } + + @Override + public boolean isLoadingCfsOnDemandDefault() { + return this.isLoadingCfsOnDemandDefault; + } /** * Close down this HRegion. Flush the cache, shut down each HStore, don't diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7f90e17085e..c65326aa49c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1304,8 +1304,11 @@ public class HStore implements Store { HRegionInfo info = this.region.getRegionInfo(); CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString())); - WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(), - this.region.getRegionInfo(), compactionDescriptor, region.getMVCC()); + // Fix reaching into Region to get the maxWaitForSeqId. + // Does this method belong in Region altogether given it is making so many references up there? + // Could be Region#writeCompactionMarker(compactionDescriptor); + WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getTableDesc(), + this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC()); } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index a9322e39df1..eae713f6d16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; /** @@ -29,6 +30,12 @@ import org.apache.hadoop.hbase.client.Scan; */ @InterfaceAudience.Private public interface KeyValueScanner extends Shipper { + /** + * The byte array represents for NO_NEXT_INDEXED_KEY; + * The actual value is irrelevant because this is always compared by reference. + */ + public static final Cell NO_NEXT_INDEXED_KEY = new KeyValue(); + /** * Look at the next Cell in this scanner, but do not iterate scanner. * @return the next Cell diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 5da8bcb75b6..c0bc8fe3d3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -157,6 +157,13 @@ public interface Region extends ConfigurationObserver { boolean isLoadingCfsOnDemandDefault(); /** @return readpoint considering given IsolationLevel */ + long getReadPoint(IsolationLevel isolationLevel); + + /** + * @return readpoint considering given IsolationLevel + * @deprecated Since 1.2.0. Use {@link #getReadPoint(IsolationLevel)} instead. + */ + @Deprecated long getReadpoint(IsolationLevel isolationLevel); /** @@ -217,8 +224,8 @@ public interface Region extends ConfigurationObserver { // Region read locks /** - * Operation enum is used in {@link Region#startRegionOperation} to provide context for - * various checks before any region operation begins. + * Operation enum is used in {@link Region#startRegionOperation} and elsewhere to provide + * context for various checks. */ enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, @@ -323,9 +330,10 @@ public interface Region extends ConfigurationObserver { OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException; /** - * Atomically checks if a row/family/qualifier value matches the expected val - * If it does, it performs the row mutations. If the passed value is null, t - * is for the lack of column (ie: non-existence) + * Atomically checks if a row/family/qualifier value matches the expected value and if it does, + * it performs the mutation. If the passed value is null, the lack of column value + * (ie: non-existence) is used. See checkAndRowMutate to do many checkAndPuts at a time on a + * single row. * @param row to check * @param family column family to check * @param qualifier column qualifier to check @@ -340,9 +348,10 @@ public interface Region extends ConfigurationObserver { ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) throws IOException; /** - * Atomically checks if a row/family/qualifier value matches the expected val - * If it does, it performs the row mutations. If the passed value is null, t - * is for the lack of column (ie: non-existence) + * Atomically checks if a row/family/qualifier value matches the expected values and if it does, + * it performs the row mutations. If the passed value is null, the lack of column value + * (ie: non-existence) is used. Use to do many mutations on a single row. Use checkAndMutate + * to do one checkAndMutate at a time. * @param row to check * @param family column family to check * @param qualifier column qualifier to check @@ -350,7 +359,7 @@ public interface Region extends ConfigurationObserver { * @param comparator * @param mutations * @param writeToWAL - * @return true if mutation was applied, false otherwise + * @return true if mutations were applied, false otherwise * @throws IOException */ boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOp compareOp, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java index cfe42e4aafb..34901b7f5c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -35,7 +35,7 @@ import com.google.protobuf.Message; * Defines the procedure to atomically perform multiple scans and mutations * on a HRegion. * - * This is invoked by HRegion#processRowsWithLocks(). + * This is invoked by {@link Region#processRowsWithLocks(RowProcessor)}. * This class performs scans and generates mutations and WAL edits. * The locks and MVCC will be handled by HRegion. * @@ -98,10 +98,8 @@ public interface RowProcessor { /** * The hook to be executed after the process() but before applying the Mutations to region. Also - * by the time this hook is been called, mvcc transaction is started. - * @param region + * by the time this hook is called, mvcc transaction have started. * @param walEdit the output WAL edits to apply to write ahead log - * @throws IOException */ void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException; @@ -109,8 +107,6 @@ public interface RowProcessor { * The hook to be executed after the process() and applying the Mutations to region. The * difference of this one with {@link #postProcess(HRegion, WALEdit, boolean)} is this hook will * be executed before the mvcc transaction completion. - * @param region - * @throws IOException */ void postBatchMutate(HRegion region) throws IOException; @@ -156,4 +152,4 @@ public interface RowProcessor { * @return The {@link Durability} to use */ Durability useDurability(); -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index c4c509f6ca9..2f0d28486a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -260,7 +260,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { this(store, scan, scanInfo, null, - ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false); + ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false); if (dropDeletesFromRow == null) { matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost()); @@ -659,7 +659,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner case SEEK_NEXT_COL: { Cell nextIndexedKey = getNextIndexedKey(); - if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY + if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) { return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE; } @@ -669,7 +669,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner case SEEK_NEXT_ROW: { Cell nextIndexedKey = getNextIndexedKey(); - if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY + if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) { return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 9ae72e64569..47e28b3cfa8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1,4 +1,5 @@ /** + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -281,8 +282,6 @@ public class FSHLog implements WAL { private final int slowSyncNs; - private final static Object [] NO_ARGS = new Object []{}; - // If live datanode count is lower than the default replicas value, // RollWriter will be triggered in each sync(So the RollWriter will be // triggered one by one in a short time). Using it as a workaround to slow @@ -1069,6 +1068,19 @@ public class FSHLog implements WAL { } } + /** + * NOTE: This append, at a time that is usually after this call returns, starts an + * mvcc transaction by calling 'begin' wherein which we assign this update a sequenceid. At + * assignment time, we stamp all the passed in Cells inside WALEdit with their sequenceId. + * You must 'complete' the transaction this mvcc transaction by calling + * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it + * in the finally of a try/finally + * block within which this append lives and any subsequent operations like sync or + * update of memstore, etc. Get the WriteEntry to pass mvcc out of the passed in WALKey + * walKey parameter. Be warned that the WriteEntry is not immediately available + * on return from this method. It WILL be available subsequent to a sync of this append; + * otherwise, you will just have to wait on the WriteEntry to get filled in. + */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", justification="Will never be null") @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 7f3eb61a015..5fe20613ebe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -106,6 +106,8 @@ class FSWALEntry extends Entry { /** * Here is where a WAL edit gets its sequenceid. + * SIDE-EFFECT is our stamping the sequenceid into every Cell AND setting the sequenceid into the + * MVCC WriteEntry!!!! * @return The sequenceid we stamped on this edit. * @throws IOException */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index bd5ca9dd208..a9113ec6b6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -1454,7 +1454,7 @@ public class HBaseFsck extends Configured implements Closeable { "You may need to restore the previously sidelined hbase:meta"); return false; } - meta.batchMutate(puts.toArray(new Put[puts.size()])); + meta.batchMutate(puts.toArray(new Put[puts.size()]), HConstants.NO_NONCE, HConstants.NO_NONCE); meta.close(); if (meta.getWAL() != null) { meta.getWAL().close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java index ea704f82404..a6f70c3bf3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java @@ -1060,7 +1060,8 @@ public class RegionSplitter { "Could not split region with given user input: " + this); // remove endpoints, which are included in the splits list - return Arrays.copyOfRange(splits, 1, splits.length - 1); + + return splits == null? null: Arrays.copyOfRange(splits, 1, splits.length - 1); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 5241dbe085e..a8983ef422d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -109,7 +109,7 @@ public class TestRegionObserverScannerOpenHook { throws IOException { scan.setFilter(new NoDataFilter()); return new StoreScanner(store, store.getScanInfo(), scan, targetCols, - ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); + ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index b64a031d3c7..025a28d466c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -76,9 +76,9 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver { throws IOException { Region r = c.getEnvironment().getRegion(); return scan.isReversed() ? new ReversedStoreScanner(store, - store.getScanInfo(), scan, targetCols, r.getReadpoint(scan + store.getScanInfo(), scan, targetCols, r.getReadPoint(scan .getIsolationLevel())) : new StoreScanner(store, - store.getScanInfo(), scan, targetCols, r.getReadpoint(scan + store.getScanInfo(), scan, targetCols, r.getReadPoint(scan .getIsolationLevel())); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index c988761ce06..b4eb7983a2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -298,7 +298,7 @@ public class TestCoprocessorScanPolicy { newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); return new StoreScanner(store, scanInfo, scan, targetCols, - ((HStore) store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); + ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED)); } else { return s; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java index c33b2c233c6..c0939f374c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java @@ -1436,7 +1436,7 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck { } } - @Test(timeout=60000) + @Test(timeout=180000) public void testCheckTableLocks() throws Exception { IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(0); EnvironmentEdgeManager.injectEdge(edge);