From 82d554e3783372cc6b05489452c815b57c06f6cd Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Mon, 10 Apr 2017 02:31:42 -0700 Subject: [PATCH] HBASE-16477 Remove Writable interface and related code from WALEdit/WALKey --- .../regionserver/wal/KeyValueCompression.java | 133 ----------------- .../hbase/regionserver/wal/WALEdit.java | 136 +----------------- .../java/org/apache/hadoop/hbase/wal/WAL.java | 1 - .../org/apache/hadoop/hbase/wal/WALKey.java | 95 ++---------- .../wal/TestKeyValueCompression.java | 116 --------------- 5 files changed, 14 insertions(+), 467 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java deleted file mode 100644 index a33ff9e9461..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.WritableUtils; - -/** - * DO NOT USE. This class is deprecated and should only be used in pre-PB WAL. - * - * Compression class for {@link KeyValue}s written to the WAL. This is not - * synchronized, so synchronization should be handled outside. - * - * Class only compresses and uncompresses row keys, family names, and the - * qualifier. More may be added depending on use patterns. - */ -@Deprecated -@InterfaceAudience.Private -class KeyValueCompression { - /** - * Uncompresses a KeyValue from a DataInput and returns it. - * - * @param in the DataInput - * @param readContext the compressionContext to use. - * @return an uncompressed KeyValue - * @throws IOException - */ - - public static KeyValue readKV(DataInput in, CompressionContext readContext) - throws IOException { - int keylength = WritableUtils.readVInt(in); - int vlength = WritableUtils.readVInt(in); - int tagsLength = WritableUtils.readVInt(in); - int length = (int) KeyValue.getKeyValueDataStructureSize(keylength, vlength, tagsLength); - - byte[] backingArray = new byte[length]; - int pos = 0; - pos = Bytes.putInt(backingArray, pos, keylength); - pos = Bytes.putInt(backingArray, pos, vlength); - - // the row - int elemLen = Compressor.uncompressIntoArray(backingArray, - pos + Bytes.SIZEOF_SHORT, in, readContext.rowDict); - checkLength(elemLen, Short.MAX_VALUE); - pos = Bytes.putShort(backingArray, pos, (short)elemLen); - pos += elemLen; - - // family - elemLen = Compressor.uncompressIntoArray(backingArray, - pos + Bytes.SIZEOF_BYTE, in, readContext.familyDict); - checkLength(elemLen, Byte.MAX_VALUE); - pos = Bytes.putByte(backingArray, pos, (byte)elemLen); - pos += elemLen; - - // qualifier - elemLen = Compressor.uncompressIntoArray(backingArray, pos, in, - readContext.qualifierDict); - pos += elemLen; - - // the rest - in.readFully(backingArray, pos, length - pos); - - return new KeyValue(backingArray, 0, length); - } - - private static void checkLength(int len, int max) throws IOException { - if (len < 0 || len > max) { - throw new IOException( - "Invalid length for compresesed portion of keyvalue: " + len); - } - } - - /** - * Compresses and writes ourKV to out, a DataOutput. - * - * @param out the DataOutput - * @param keyVal the KV to compress and write - * @param writeContext the compressionContext to use. - * @throws IOException - */ - public static void writeKV(final DataOutput out, KeyValue keyVal, - CompressionContext writeContext) throws IOException { - byte[] backingArray = keyVal.getBuffer(); - int offset = keyVal.getOffset(); - - // we first write the KeyValue infrastructure as VInts. - WritableUtils.writeVInt(out, keyVal.getKeyLength()); - WritableUtils.writeVInt(out, keyVal.getValueLength()); - WritableUtils.writeVInt(out, keyVal.getTagsLength()); - - // now we write the row key, as the row key is likely to be repeated - // We save space only if we attempt to compress elements with duplicates - Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getRowOffset(), - keyVal.getRowLength(), out, writeContext.rowDict); - - - // now family, if it exists. if it doesn't, we write a 0 length array. - Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getFamilyOffset(), - keyVal.getFamilyLength(), out, writeContext.familyDict); - - // qualifier next - Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getQualifierOffset(), - keyVal.getQualifierLength(), out, - writeContext.qualifierDict); - - // now we write the rest uncompressed - int pos = keyVal.getTimestampOffset(); - int remainingLength = keyVal.getLength() + offset - (pos); - out.write(backingArray, pos, remainingLength); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 7a8b3d5d096..d5b95ee0c16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -18,22 +18,17 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.NavigableMap; -import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -43,7 +38,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.io.Writable; import com.google.common.annotations.VisibleForTesting; @@ -51,40 +45,14 @@ import com.google.common.annotations.VisibleForTesting; /** * WALEdit: Used in HBase's transaction log (WAL) to represent * the collection of edits (KeyValue objects) corresponding to a - * single transaction. The class implements "Writable" interface - * for serializing/deserializing a set of KeyValue items. - * - * Previously, if a transaction contains 3 edits to c1, c2, c3 for a row R, - * the WAL would have three log entries as follows: - * - * <logseq1-for-edit1>:<eyValue-for-edit-c1> - * <logseq2-for-edit2>:<KeyValue-for-edit-c2> - * <logseq3-for-edit3>:<KeyValue-for-edit-c3> - * - * This presents problems because row level atomicity of transactions - * was not guaranteed. If we crash after few of the above appends make - * it, then recovery will restore a partial transaction. - * - * In the new world, all the edits for a given transaction are written - * out as a single record, for example: - * - * <logseq#-for-entire-txn>:<WALEdit-for-entire-txn> - * - * where, the WALEdit is serialized as: - * <-1, # of edits, <KeyValue>, <KeyValue>, ... > - * For example: - * <-1, 3, <KV-for-edit-c1>, <KV-for-edit-c2>, <KV-for-edit-c3>> - * - * The -1 marker is just a special way of being backward compatible with - * an old WAL which would have contained a single <KeyValue>. - * - * The deserializer for WALEdit backward compatibly detects if the record - * is an old style KeyValue or the new style WALEdit. + * single transaction. * + * All the edits for a given transaction are written out as a single record, in PB format followed + * by Cells written via the WALCellEncoder. */ @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION, HBaseInterfaceAudience.COPROC }) -public class WALEdit implements Writable, HeapSize { +public class WALEdit implements HeapSize { private static final Log LOG = LogFactory.getLog(WALEdit.class); // TODO: Get rid of this; see HBASE-8457 @@ -100,22 +68,10 @@ public class WALEdit implements Writable, HeapSize { @VisibleForTesting public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD"); - private static final int VERSION_2 = -1; private final boolean isReplay; private ArrayList cells = null; - public static final WALEdit EMPTY_WALEDIT = new WALEdit(); - - // Only here for legacy writable deserialization - /** - * @deprecated Legacy - */ - @Deprecated - private NavigableMap scopes; - - private CompressionContext compressionContext; - public WALEdit() { this(false); } @@ -162,10 +118,6 @@ public class WALEdit implements Writable, HeapSize { return this.isReplay; } - public void setCompressionContext(final CompressionContext compressionContext) { - this.compressionContext = compressionContext; - } - public WALEdit add(Cell cell) { this.cells.add(cell); return this; @@ -195,74 +147,6 @@ public class WALEdit implements Writable, HeapSize { this.cells = cells; } - public NavigableMap getAndRemoveScopes() { - NavigableMap result = scopes; - scopes = null; - return result; - } - - @Override - public void readFields(DataInput in) throws IOException { - cells.clear(); - if (scopes != null) { - scopes.clear(); - } - int versionOrLength = in.readInt(); - // TODO: Change version when we protobuf. Also, change way we serialize KV! Pb it too. - if (versionOrLength == VERSION_2) { - // this is new style WAL entry containing multiple KeyValues. - int numEdits = in.readInt(); - for (int idx = 0; idx < numEdits; idx++) { - if (compressionContext != null) { - this.add(KeyValueCompression.readKV(in, compressionContext)); - } else { - this.add(KeyValueUtil.create(in)); - } - } - int numFamilies = in.readInt(); - if (numFamilies > 0) { - if (scopes == null) { - scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - } - for (int i = 0; i < numFamilies; i++) { - byte[] fam = Bytes.readByteArray(in); - int scope = in.readInt(); - scopes.put(fam, scope); - } - } - } else { - // this is an old style WAL entry. The int that we just - // read is actually the length of a single KeyValue - this.add(KeyValueUtil.create(versionOrLength, in)); - } - } - - @Override - public void write(DataOutput out) throws IOException { - LOG.warn("WALEdit is being serialized to writable - only expected in test code"); - out.writeInt(VERSION_2); - out.writeInt(cells.size()); - // We interleave the two lists for code simplicity - for (Cell cell : cells) { - // This is not used in any of the core code flows so it is just fine to convert to KV - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - if (compressionContext != null) { - KeyValueCompression.writeKV(out, kv, compressionContext); - } else{ - KeyValueUtil.write(kv, out); - } - } - if (scopes == null) { - out.writeInt(0); - } else { - out.writeInt(scopes.size()); - for (byte[] key : scopes.keySet()) { - Bytes.writeByteArray(out, key); - out.writeInt(scopes.get(key)); - } - } - } - /** * Reads WALEdit from cells. * @param cellDecoder Cell decoder. @@ -284,11 +168,6 @@ public class WALEdit implements Writable, HeapSize { for (Cell cell : cells) { ret += CellUtil.estimatedHeapSizeOf(cell); } - if (scopes != null) { - ret += ClassSize.TREEMAP; - ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY); - // TODO this isn't quite right, need help here - } return ret; } @@ -301,9 +180,6 @@ public class WALEdit implements Writable, HeapSize { sb.append(cell); sb.append("; "); } - if (scopes != null) { - sb.append(" scopes: " + scopes.toString()); - } sb.append(">]"); return sb.toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index b7adc60cc52..2ae20cf680e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -271,7 +271,6 @@ public interface WAL extends Closeable { * Compression context */ public void setCompressionContext(CompressionContext compressionContext) { - edit.setCompressionContext(compressionContext); key.setCompressionContext(compressionContext); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 9a8003a3bbb..bd03e4d0430 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -25,7 +25,6 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -59,8 +58,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; * *

Some Transactional edits (START, COMMIT, ABORT) will not have an associated row. * - * Note that protected members marked @InterfaceAudience.Private are only protected - * to support the legacy HLogKey class, which is in a different package. */ // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical // purposes. They need to be merged into WALEntry. @@ -100,60 +97,9 @@ public class WALKey implements SequenceId, Comparable { this.sequenceId = writeEntry.getWriteNumber(); } - // REMOVE!!!! No more Writables!!!! - // Should be < 0 (@see HLogKey#readFields(DataInput)) - // version 2 supports WAL compression - // public members here are only public because of HLogKey - @InterfaceAudience.Private - protected enum Version { - UNVERSIONED(0), - // Initial number we put on WALKey when we introduced versioning. - INITIAL(-1), - // Version -2 introduced a dictionary compression facility. Only this - // dictionary-based compression is available in version -2. - COMPRESSED(-2); + private byte [] encodedRegionName; - public final int code; - static final Version[] byCode; - static { - byCode = Version.values(); - for (int i = 0; i < byCode.length; i++) { - if (byCode[i].code != -1 * i) { - throw new AssertionError("Values in this enum should be descending by one"); - } - } - } - - Version(int code) { - this.code = code; - } - - public boolean atLeast(Version other) { - return code <= other.code; - } - - public static Version fromCode(int code) { - return byCode[code * -1]; - } - } - - /* - * This is used for reading the log entries created by the previous releases - * (0.94.11) which write the clusters information to the scopes of WALEdit. - */ - private static final String PREFIX_CLUSTER_KEY = "."; - - - // visible for deprecated HLogKey - @InterfaceAudience.Private - protected static final Version VERSION = Version.COMPRESSED; - - // visible for deprecated HLogKey - @InterfaceAudience.Private - protected byte [] encodedRegionName; - // visible for deprecated HLogKey - @InterfaceAudience.Private - protected TableName tablename; + private TableName tablename; /** * SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is * NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized. @@ -165,15 +111,11 @@ public class WALKey implements SequenceId, Comparable { */ private long origLogSeqNum = 0; - // Time at which this edit was written. - // visible for deprecated HLogKey - @InterfaceAudience.Private - protected long writeTime; + /** Time at which this edit was written. */ + private long writeTime; - // The first element in the list is the cluster id on which the change has originated - // visible for deprecated HLogKey - @InterfaceAudience.Private - protected List clusterIds; + /** The first element in the list is the cluster id on which the change has originated */ + private List clusterIds; private NavigableMap replicationScope; @@ -186,9 +128,7 @@ public class WALKey implements SequenceId, Comparable { private MultiVersionConcurrencyControl.WriteEntry writeEntry; public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); - // visible for deprecated HLogKey - @InterfaceAudience.Private - protected CompressionContext compressionContext; + private CompressionContext compressionContext; public WALKey() { init(null, null, 0L, HConstants.LATEST_TIMESTAMP, @@ -397,7 +337,7 @@ public class WALKey implements SequenceId, Comparable { this.replicationScope = replicationScope; } - // For HLogKey and deserialization. DO NOT USE. See setWriteEntry below. + // For deserialization. DO NOT USE. See setWriteEntry below. @InterfaceAudience.Private protected void setSequenceId(long sequenceId) { this.sequenceId = sequenceId; @@ -486,25 +426,6 @@ public class WALKey implements SequenceId, Comparable { } } - public void readOlderScopes(NavigableMap scopes) { - if (scopes != null) { - Iterator> iterator = scopes.entrySet() - .iterator(); - while (iterator.hasNext()) { - Map.Entry scope = iterator.next(); - String key = Bytes.toString(scope.getKey()); - if (key.startsWith(PREFIX_CLUSTER_KEY)) { - addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY - .length()))); - iterator.remove(); - } - } - if (scopes.size() > 0) { - this.replicationScope = scopes; - } - } - } - /** * Marks that the cluster with the given clusterId has consumed the change */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java deleted file mode 100644 index 4a256a6b78e..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ -package org.apache.hadoop.hbase.regionserver.wal; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.ArrayBackedTag; -import org.apache.hadoop.hbase.io.util.LRUDictionary; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.DataOutputBuffer; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.*; - -import com.google.common.collect.Lists; - -@Category({RegionServerTests.class, SmallTests.class}) -public class TestKeyValueCompression { - private static final byte[] VALUE = Bytes.toBytes("fake value"); - private static final int BUF_SIZE = 256*1024; - - @Test - public void testCountingKVs() throws Exception { - List kvs = Lists.newArrayList(); - for (int i = 0; i < 400; i++) { - byte[] row = Bytes.toBytes("row" + i); - byte[] fam = Bytes.toBytes("fam" + i); - byte[] qual = Bytes.toBytes("qual" + i); - kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE)); - } - - runTestCycle(kvs); - } - - @Test - public void testRepeatingKVs() throws Exception { - List kvs = Lists.newArrayList(); - for (int i = 0; i < 400; i++) { - byte[] row = Bytes.toBytes("row" + (i % 10)); - byte[] fam = Bytes.toBytes("fam" + (i % 127)); - byte[] qual = Bytes.toBytes("qual" + (i % 128)); - kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE)); - } - - runTestCycle(kvs); - } - - private void runTestCycle(List kvs) throws Exception { - CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false); - DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE); - for (KeyValue kv : kvs) { - KeyValueCompression.writeKV(buf, kv, ctx); - } - - ctx.clear(); - DataInputStream in = new DataInputStream(new ByteArrayInputStream( - buf.getData(), 0, buf.getLength())); - for (KeyValue kv : kvs) { - KeyValue readBack = KeyValueCompression.readKV(in, ctx); - assertEquals(kv, readBack); - } - } - - @Test - public void testKVWithTags() throws Exception { - CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false); - DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE); - KeyValueCompression.writeKV(buf, createKV(1), ctx); - KeyValueCompression.writeKV(buf, createKV(0), ctx); - KeyValueCompression.writeKV(buf, createKV(2), ctx); - - ctx.clear(); - DataInputStream in = new DataInputStream(new ByteArrayInputStream( - buf.getData(), 0, buf.getLength())); - - KeyValue readBack = KeyValueCompression.readKV(in, ctx); - List tags = readBack.getTags(); - assertEquals(1, tags.size()); - } - - private KeyValue createKV(int noOfTags) { - byte[] row = Bytes.toBytes("myRow"); - byte[] cf = Bytes.toBytes("myCF"); - byte[] q = Bytes.toBytes("myQualifier"); - byte[] value = Bytes.toBytes("myValue"); - List tags = new ArrayList<>(noOfTags); - for (int i = 1; i <= noOfTags; i++) { - tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); - } - return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); - } -}