HBASE-16477 Remove Writable interface and related code from WALEdit/WALKey

This commit is contained in:
Enis Soztutar 2017-04-10 02:31:42 -07:00
parent df96d328fb
commit 82d554e378
5 changed files with 14 additions and 467 deletions

View File

@ -1,133 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
/**
* DO NOT USE. This class is deprecated and should only be used in pre-PB WAL.
*
* Compression class for {@link KeyValue}s written to the WAL. This is not
* synchronized, so synchronization should be handled outside.
*
* Class only compresses and uncompresses row keys, family names, and the
* qualifier. More may be added depending on use patterns.
*/
@Deprecated
@InterfaceAudience.Private
class KeyValueCompression {
/**
* Uncompresses a KeyValue from a DataInput and returns it.
*
* @param in the DataInput
* @param readContext the compressionContext to use.
* @return an uncompressed KeyValue
* @throws IOException
*/
public static KeyValue readKV(DataInput in, CompressionContext readContext)
throws IOException {
int keylength = WritableUtils.readVInt(in);
int vlength = WritableUtils.readVInt(in);
int tagsLength = WritableUtils.readVInt(in);
int length = (int) KeyValue.getKeyValueDataStructureSize(keylength, vlength, tagsLength);
byte[] backingArray = new byte[length];
int pos = 0;
pos = Bytes.putInt(backingArray, pos, keylength);
pos = Bytes.putInt(backingArray, pos, vlength);
// the row
int elemLen = Compressor.uncompressIntoArray(backingArray,
pos + Bytes.SIZEOF_SHORT, in, readContext.rowDict);
checkLength(elemLen, Short.MAX_VALUE);
pos = Bytes.putShort(backingArray, pos, (short)elemLen);
pos += elemLen;
// family
elemLen = Compressor.uncompressIntoArray(backingArray,
pos + Bytes.SIZEOF_BYTE, in, readContext.familyDict);
checkLength(elemLen, Byte.MAX_VALUE);
pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
pos += elemLen;
// qualifier
elemLen = Compressor.uncompressIntoArray(backingArray, pos, in,
readContext.qualifierDict);
pos += elemLen;
// the rest
in.readFully(backingArray, pos, length - pos);
return new KeyValue(backingArray, 0, length);
}
private static void checkLength(int len, int max) throws IOException {
if (len < 0 || len > max) {
throw new IOException(
"Invalid length for compresesed portion of keyvalue: " + len);
}
}
/**
* Compresses and writes ourKV to out, a DataOutput.
*
* @param out the DataOutput
* @param keyVal the KV to compress and write
* @param writeContext the compressionContext to use.
* @throws IOException
*/
public static void writeKV(final DataOutput out, KeyValue keyVal,
CompressionContext writeContext) throws IOException {
byte[] backingArray = keyVal.getBuffer();
int offset = keyVal.getOffset();
// we first write the KeyValue infrastructure as VInts.
WritableUtils.writeVInt(out, keyVal.getKeyLength());
WritableUtils.writeVInt(out, keyVal.getValueLength());
WritableUtils.writeVInt(out, keyVal.getTagsLength());
// now we write the row key, as the row key is likely to be repeated
// We save space only if we attempt to compress elements with duplicates
Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getRowOffset(),
keyVal.getRowLength(), out, writeContext.rowDict);
// now family, if it exists. if it doesn't, we write a 0 length array.
Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getFamilyOffset(),
keyVal.getFamilyLength(), out, writeContext.familyDict);
// qualifier next
Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getQualifierOffset(),
keyVal.getQualifierLength(), out,
writeContext.qualifierDict);
// now we write the rest uncompressed
int pos = keyVal.getTimestampOffset();
int remainingLength = keyVal.getLength() + offset - (pos);
out.write(backingArray, pos, remainingLength);
}
}

View File

@ -18,22 +18,17 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@ -43,7 +38,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.Writable;
import com.google.common.annotations.VisibleForTesting;
@ -51,40 +45,14 @@ import com.google.common.annotations.VisibleForTesting;
/**
* WALEdit: Used in HBase's transaction log (WAL) to represent
* the collection of edits (KeyValue objects) corresponding to a
* single transaction. The class implements "Writable" interface
* for serializing/deserializing a set of KeyValue items.
*
* Previously, if a transaction contains 3 edits to c1, c2, c3 for a row R,
* the WAL would have three log entries as follows:
*
* &lt;logseq1-for-edit1&gt;:&lt;eyValue-for-edit-c1&gt;
* &lt;logseq2-for-edit2&gt;:&lt;KeyValue-for-edit-c2&gt;
* &lt;logseq3-for-edit3&gt;:&lt;KeyValue-for-edit-c3&gt;
*
* This presents problems because row level atomicity of transactions
* was not guaranteed. If we crash after few of the above appends make
* it, then recovery will restore a partial transaction.
*
* In the new world, all the edits for a given transaction are written
* out as a single record, for example:
*
* &lt;logseq#-for-entire-txn&gt;:&lt;WALEdit-for-entire-txn&gt;
*
* where, the WALEdit is serialized as:
* &lt;-1, # of edits, &lt;KeyValue&gt;, &lt;KeyValue&gt;, ... &gt;
* For example:
* &lt;-1, 3, &lt;KV-for-edit-c1&gt;, &lt;KV-for-edit-c2&gt;, &lt;KV-for-edit-c3&gt;&gt;
*
* The -1 marker is just a special way of being backward compatible with
* an old WAL which would have contained a single &lt;KeyValue&gt;.
*
* The deserializer for WALEdit backward compatibly detects if the record
* is an old style KeyValue or the new style WALEdit.
* single transaction.
*
* All the edits for a given transaction are written out as a single record, in PB format followed
* by Cells written via the WALCellEncoder.
*/
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
HBaseInterfaceAudience.COPROC })
public class WALEdit implements Writable, HeapSize {
public class WALEdit implements HeapSize {
private static final Log LOG = LogFactory.getLog(WALEdit.class);
// TODO: Get rid of this; see HBASE-8457
@ -100,22 +68,10 @@ public class WALEdit implements Writable, HeapSize {
@VisibleForTesting
public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
private static final int VERSION_2 = -1;
private final boolean isReplay;
private ArrayList<Cell> cells = null;
public static final WALEdit EMPTY_WALEDIT = new WALEdit();
// Only here for legacy writable deserialization
/**
* @deprecated Legacy
*/
@Deprecated
private NavigableMap<byte[], Integer> scopes;
private CompressionContext compressionContext;
public WALEdit() {
this(false);
}
@ -162,10 +118,6 @@ public class WALEdit implements Writable, HeapSize {
return this.isReplay;
}
public void setCompressionContext(final CompressionContext compressionContext) {
this.compressionContext = compressionContext;
}
public WALEdit add(Cell cell) {
this.cells.add(cell);
return this;
@ -195,74 +147,6 @@ public class WALEdit implements Writable, HeapSize {
this.cells = cells;
}
public NavigableMap<byte[], Integer> getAndRemoveScopes() {
NavigableMap<byte[], Integer> result = scopes;
scopes = null;
return result;
}
@Override
public void readFields(DataInput in) throws IOException {
cells.clear();
if (scopes != null) {
scopes.clear();
}
int versionOrLength = in.readInt();
// TODO: Change version when we protobuf. Also, change way we serialize KV! Pb it too.
if (versionOrLength == VERSION_2) {
// this is new style WAL entry containing multiple KeyValues.
int numEdits = in.readInt();
for (int idx = 0; idx < numEdits; idx++) {
if (compressionContext != null) {
this.add(KeyValueCompression.readKV(in, compressionContext));
} else {
this.add(KeyValueUtil.create(in));
}
}
int numFamilies = in.readInt();
if (numFamilies > 0) {
if (scopes == null) {
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
}
for (int i = 0; i < numFamilies; i++) {
byte[] fam = Bytes.readByteArray(in);
int scope = in.readInt();
scopes.put(fam, scope);
}
}
} else {
// this is an old style WAL entry. The int that we just
// read is actually the length of a single KeyValue
this.add(KeyValueUtil.create(versionOrLength, in));
}
}
@Override
public void write(DataOutput out) throws IOException {
LOG.warn("WALEdit is being serialized to writable - only expected in test code");
out.writeInt(VERSION_2);
out.writeInt(cells.size());
// We interleave the two lists for code simplicity
for (Cell cell : cells) {
// This is not used in any of the core code flows so it is just fine to convert to KV
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (compressionContext != null) {
KeyValueCompression.writeKV(out, kv, compressionContext);
} else{
KeyValueUtil.write(kv, out);
}
}
if (scopes == null) {
out.writeInt(0);
} else {
out.writeInt(scopes.size());
for (byte[] key : scopes.keySet()) {
Bytes.writeByteArray(out, key);
out.writeInt(scopes.get(key));
}
}
}
/**
* Reads WALEdit from cells.
* @param cellDecoder Cell decoder.
@ -284,11 +168,6 @@ public class WALEdit implements Writable, HeapSize {
for (Cell cell : cells) {
ret += CellUtil.estimatedHeapSizeOf(cell);
}
if (scopes != null) {
ret += ClassSize.TREEMAP;
ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
// TODO this isn't quite right, need help here
}
return ret;
}
@ -301,9 +180,6 @@ public class WALEdit implements Writable, HeapSize {
sb.append(cell);
sb.append("; ");
}
if (scopes != null) {
sb.append(" scopes: " + scopes.toString());
}
sb.append(">]");
return sb.toString();
}

View File

@ -271,7 +271,6 @@ public interface WAL extends Closeable {
* Compression context
*/
public void setCompressionContext(CompressionContext compressionContext) {
edit.setCompressionContext(compressionContext);
key.setCompressionContext(compressionContext);
}

View File

@ -25,7 +25,6 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -59,8 +58,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
*
* <p>Some Transactional edits (START, COMMIT, ABORT) will not have an associated row.
*
* Note that protected members marked @InterfaceAudience.Private are only protected
* to support the legacy HLogKey class, which is in a different package.
*/
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into WALEntry.
@ -100,60 +97,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
this.sequenceId = writeEntry.getWriteNumber();
}
// REMOVE!!!! No more Writables!!!!
// Should be < 0 (@see HLogKey#readFields(DataInput))
// version 2 supports WAL compression
// public members here are only public because of HLogKey
@InterfaceAudience.Private
protected enum Version {
UNVERSIONED(0),
// Initial number we put on WALKey when we introduced versioning.
INITIAL(-1),
// Version -2 introduced a dictionary compression facility. Only this
// dictionary-based compression is available in version -2.
COMPRESSED(-2);
private byte [] encodedRegionName;
public final int code;
static final Version[] byCode;
static {
byCode = Version.values();
for (int i = 0; i < byCode.length; i++) {
if (byCode[i].code != -1 * i) {
throw new AssertionError("Values in this enum should be descending by one");
}
}
}
Version(int code) {
this.code = code;
}
public boolean atLeast(Version other) {
return code <= other.code;
}
public static Version fromCode(int code) {
return byCode[code * -1];
}
}
/*
* This is used for reading the log entries created by the previous releases
* (0.94.11) which write the clusters information to the scopes of WALEdit.
*/
private static final String PREFIX_CLUSTER_KEY = ".";
// visible for deprecated HLogKey
@InterfaceAudience.Private
protected static final Version VERSION = Version.COMPRESSED;
// visible for deprecated HLogKey
@InterfaceAudience.Private
protected byte [] encodedRegionName;
// visible for deprecated HLogKey
@InterfaceAudience.Private
protected TableName tablename;
private TableName tablename;
/**
* SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is
* NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized.
@ -165,15 +111,11 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
*/
private long origLogSeqNum = 0;
// Time at which this edit was written.
// visible for deprecated HLogKey
@InterfaceAudience.Private
protected long writeTime;
/** Time at which this edit was written. */
private long writeTime;
// The first element in the list is the cluster id on which the change has originated
// visible for deprecated HLogKey
@InterfaceAudience.Private
protected List<UUID> clusterIds;
/** The first element in the list is the cluster id on which the change has originated */
private List<UUID> clusterIds;
private NavigableMap<byte[], Integer> replicationScope;
@ -186,9 +128,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
private MultiVersionConcurrencyControl.WriteEntry writeEntry;
public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
// visible for deprecated HLogKey
@InterfaceAudience.Private
protected CompressionContext compressionContext;
private CompressionContext compressionContext;
public WALKey() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
@ -397,7 +337,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
this.replicationScope = replicationScope;
}
// For HLogKey and deserialization. DO NOT USE. See setWriteEntry below.
// For deserialization. DO NOT USE. See setWriteEntry below.
@InterfaceAudience.Private
protected void setSequenceId(long sequenceId) {
this.sequenceId = sequenceId;
@ -486,25 +426,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
}
public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
if (scopes != null) {
Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
.iterator();
while (iterator.hasNext()) {
Map.Entry<byte[], Integer> scope = iterator.next();
String key = Bytes.toString(scope.getKey());
if (key.startsWith(PREFIX_CLUSTER_KEY)) {
addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
.length())));
iterator.remove();
}
}
if (scopes.size() > 0) {
this.replicationScope = scopes;
}
}
}
/**
* Marks that the cluster with the given clusterId has consumed the change
*/

View File

@ -1,116 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
import com.google.common.collect.Lists;
@Category({RegionServerTests.class, SmallTests.class})
public class TestKeyValueCompression {
private static final byte[] VALUE = Bytes.toBytes("fake value");
private static final int BUF_SIZE = 256*1024;
@Test
public void testCountingKVs() throws Exception {
List<KeyValue> kvs = Lists.newArrayList();
for (int i = 0; i < 400; i++) {
byte[] row = Bytes.toBytes("row" + i);
byte[] fam = Bytes.toBytes("fam" + i);
byte[] qual = Bytes.toBytes("qual" + i);
kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE));
}
runTestCycle(kvs);
}
@Test
public void testRepeatingKVs() throws Exception {
List<KeyValue> kvs = Lists.newArrayList();
for (int i = 0; i < 400; i++) {
byte[] row = Bytes.toBytes("row" + (i % 10));
byte[] fam = Bytes.toBytes("fam" + (i % 127));
byte[] qual = Bytes.toBytes("qual" + (i % 128));
kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE));
}
runTestCycle(kvs);
}
private void runTestCycle(List<KeyValue> kvs) throws Exception {
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
for (KeyValue kv : kvs) {
KeyValueCompression.writeKV(buf, kv, ctx);
}
ctx.clear();
DataInputStream in = new DataInputStream(new ByteArrayInputStream(
buf.getData(), 0, buf.getLength()));
for (KeyValue kv : kvs) {
KeyValue readBack = KeyValueCompression.readKV(in, ctx);
assertEquals(kv, readBack);
}
}
@Test
public void testKVWithTags() throws Exception {
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
KeyValueCompression.writeKV(buf, createKV(1), ctx);
KeyValueCompression.writeKV(buf, createKV(0), ctx);
KeyValueCompression.writeKV(buf, createKV(2), ctx);
ctx.clear();
DataInputStream in = new DataInputStream(new ByteArrayInputStream(
buf.getData(), 0, buf.getLength()));
KeyValue readBack = KeyValueCompression.readKV(in, ctx);
List<Tag> tags = readBack.getTags();
assertEquals(1, tags.size());
}
private KeyValue createKV(int noOfTags) {
byte[] row = Bytes.toBytes("myRow");
byte[] cf = Bytes.toBytes("myCF");
byte[] q = Bytes.toBytes("myQualifier");
byte[] value = Bytes.toBytes("myValue");
List<Tag> tags = new ArrayList<>(noOfTags);
for (int i = 1; i <= noOfTags; i++) {
tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
}
return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
}
}