From c57acf28e7cabcfcbce8ae0006080088cdc47f50 Mon Sep 17 00:00:00 2001
From: Dustin Pho
Date: Mon, 12 Sep 2016 13:25:02 -0700
Subject: [PATCH 1/3] HBASE-16540 Adding checks in Scanner's setStartRow and
setStopRow for invalid row key sizes.
Signed-off-by: Gary Helmling
---
.../org/apache/hadoop/hbase/client/Scan.java | 16 +++++++++++
.../apache/hadoop/hbase/client/TestScan.java | 28 +++++++++++++++++++
2 files changed, 44 insertions(+)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index ee3ed43ee4f..22f611a8cb7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -373,8 +373,16 @@ public class Scan extends Query {
* next closest row after the specified row.
* @param startRow row to start scanner at or after
* @return this
+ * @throws IllegalArgumentException if startRow does not meet criteria
+ * for a row key (when length exceeds {@link HConstants#MAX_ROW_LENGTH})
*/
public Scan setStartRow(byte [] startRow) {
+ if (Bytes.len(startRow) > HConstants.MAX_ROW_LENGTH) {
+ throw new IllegalArgumentException(
+ "startRow's length must be less than or equal to " +
+ HConstants.MAX_ROW_LENGTH + " to meet the criteria" +
+ " for a row key.");
+ }
this.startRow = startRow;
return this;
}
@@ -389,8 +397,16 @@ public class Scan extends Query {
* use {@link #setRowPrefixFilter(byte[])}.
* The 'trailing 0' will not yield the desired result.
* @return this
+ * @throws IllegalArgumentException if stopRow does not meet criteria
+ * for a row key (when length exceeds {@link HConstants#MAX_ROW_LENGTH})
*/
public Scan setStopRow(byte [] stopRow) {
+ if (Bytes.len(stopRow) > HConstants.MAX_ROW_LENGTH) {
+ throw new IllegalArgumentException(
+ "stopRow's length must be less than or equal to " +
+ HConstants.MAX_ROW_LENGTH + " to meet the criteria" +
+ " for a row key.");
+ }
this.stopRow = stopRow;
return this;
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java
index 129914fdcbf..16c74df764a 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.security.visibility.Authorizations;
@@ -132,5 +134,31 @@ public class TestScan {
fail("should not throw exception");
}
}
+
+ @Test
+ public void testSetStartRowAndSetStopRow() {
+ Scan scan = new Scan();
+ scan.setStartRow(null);
+ scan.setStartRow(new byte[1]);
+ scan.setStartRow(new byte[HConstants.MAX_ROW_LENGTH]);
+ try {
+ scan.setStartRow(new byte[HConstants.MAX_ROW_LENGTH+1]);
+ fail("should've thrown exception");
+ } catch (IllegalArgumentException iae) {
+ } catch (Exception e) {
+ fail("expected IllegalArgumentException to be thrown");
+ }
+
+ scan.setStopRow(null);
+ scan.setStopRow(new byte[1]);
+ scan.setStopRow(new byte[HConstants.MAX_ROW_LENGTH]);
+ try {
+ scan.setStopRow(new byte[HConstants.MAX_ROW_LENGTH+1]);
+ fail("should've thrown exception");
+ } catch (IllegalArgumentException iae) {
+ } catch (Exception e) {
+ fail("expected IllegalArgumentException to be thrown");
+ }
+ }
}
From 2566cfeb60de644f287ac192d360f3fc15376c8f Mon Sep 17 00:00:00 2001
From: chenheng
Date: Tue, 13 Sep 2016 10:07:45 +0800
Subject: [PATCH 2/3] HBASE-16592 Unify Delete request with AP
---
.../hadoop/hbase/client/AbstractResponse.java | 38 +++++++++++
.../hadoop/hbase/client/AsyncProcess.java | 22 +++++--
.../apache/hadoop/hbase/client/HTable.java | 43 ++++++++----
.../hadoop/hbase/client/MultiResponse.java | 7 +-
.../hadoop/hbase/client/SingleResponse.java | 65 +++++++++++++++++++
.../hbase/protobuf/ResponseConverter.java | 14 ++++
.../hadoop/hbase/client/TestAsyncProcess.java | 18 ++---
.../hbase/client/TestFromClientSide.java | 46 +++++++++++++
8 files changed, 222 insertions(+), 31 deletions(-)
create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java
create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java
new file mode 100644
index 00000000000..7878d0519c3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This class is used to extend AP to process single action request, like delete, get etc.
+ */
+@InterfaceAudience.Private
+abstract class AbstractResponse {
+
+ public enum ResponseType {
+
+ SINGLE (0),
+ MULTI (1);
+
+ ResponseType(int value) {}
+ }
+
+ public abstract ResponseType type();
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index c5745e9cf48..15312018a5c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -756,14 +756,14 @@ class AsyncProcess {
@Override
public void run() {
- MultiResponse res;
+ AbstractResponse res;
CancellableRegionServerCallable callable = currentCallable;
try {
// setup the callable based on the actions, if we don't have one already from the request
if (callable == null) {
callable = createCallable(server, tableName, multiAction);
}
- RpcRetryingCaller caller = createCaller(callable);
+ RpcRetryingCaller caller = createCaller(callable);
try {
if (callsInProgress != null) {
callsInProgress.add(callable);
@@ -785,9 +785,16 @@ class AsyncProcess {
receiveGlobalFailure(multiAction, server, numAttempt, t);
return;
}
-
- // Normal case: we received an answer from the server, and it's not an exception.
- receiveMultiAction(multiAction, server, res, numAttempt);
+ if (res.type() == AbstractResponse.ResponseType.MULTI) {
+ // Normal case: we received an answer from the server, and it's not an exception.
+ receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
+ } else {
+ if (results != null) {
+ SingleResponse singleResponse = (SingleResponse) res;
+ results[0] = singleResponse.getEntry();
+ }
+ decActionCounter(1);
+ }
} catch (Throwable t) {
// Something really bad happened. We are on the send thread that will now die.
LOG.error("Internal AsyncProcess #" + id + " error for "
@@ -1782,8 +1789,9 @@ class AsyncProcess {
* Create a caller. Isolated to be easily overridden in the tests.
*/
@VisibleForTesting
- protected RpcRetryingCaller createCaller(CancellableRegionServerCallable callable) {
- return rpcCallerFactory. newCaller();
+ protected RpcRetryingCaller createCaller(
+ CancellableRegionServerCallable callable) {
+ return rpcCallerFactory. newCaller();
}
@VisibleForTesting
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 0d1b156c179..bcbb1dafaf1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -524,18 +523,25 @@ public class HTable implements Table {
@Override
public void delete(final Delete delete)
throws IOException {
- RegionServerCallable callable = new RegionServerCallable(connection,
- this.rpcControllerFactory, getName(), delete.getRow()) {
+ CancellableRegionServerCallable callable =
+ new CancellableRegionServerCallable(
+ connection, getName(), delete.getRow(), this.rpcControllerFactory) {
@Override
- protected Boolean rpcCall() throws Exception {
+ protected SingleResponse rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), delete);
MutateResponse response = getStub().mutate(getRpcController(), request);
- return Boolean.valueOf(response.getProcessed());
+ return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
};
- rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable,
- this.operationTimeout);
+ List rows = new ArrayList();
+ rows.add(delete);
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
+ null, null, callable, operationTimeout);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ throw ars.getErrors();
+ }
}
/**
@@ -768,21 +774,30 @@ public class HTable implements Table {
final byte [] qualifier, final CompareOp compareOp, final byte [] value,
final Delete delete)
throws IOException {
- RegionServerCallable callable =
- new RegionServerCallable(this.connection, this.rpcControllerFactory,
- getName(), row) {
+ CancellableRegionServerCallable callable =
+ new CancellableRegionServerCallable(
+ this.connection, getName(), row, this.rpcControllerFactory) {
@Override
- protected Boolean rpcCall() throws Exception {
+ protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, delete);
MutateResponse response = getStub().mutate(getRpcController(), request);
- return Boolean.valueOf(response.getProcessed());
+ return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
};
- return rpcCallerFactory. newCaller(this.writeRpcTimeout).callWithRetries(callable,
- this.operationTimeout);
+ List rows = new ArrayList();
+ rows.add(delete);
+
+ Object[] results = new Object[1];
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
+ null, results, callable, operationTimeout);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ throw ars.getErrors();
+ }
+ return ((SingleResponse.Entry)results[0]).isProcessed();
}
/**
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
index 79a9ed34013..18376f480fe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* A container for Result objects, grouped by regionName.
*/
@InterfaceAudience.Private
-public class MultiResponse {
+public class MultiResponse extends AbstractResponse {
// map of regionName to map of Results by the original index for that Result
private Map results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -101,6 +101,11 @@ public class MultiResponse {
return this.results;
}
+ @Override
+ public ResponseType type() {
+ return ResponseType.MULTI;
+ }
+
static class RegionResult{
Map result = new HashMap<>();
ClientProtos.RegionLoadStats stat;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java
new file mode 100644
index 00000000000..68897b58ba6
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Class for single action response
+ */
+@InterfaceAudience.Private
+public class SingleResponse extends AbstractResponse {
+ private Entry entry = null;
+
+ @InterfaceAudience.Private
+ public static class Entry {
+ private Result result = null;
+ private boolean processed = false;
+
+ public Result getResult() {
+ return result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public boolean isProcessed() {
+ return processed;
+ }
+
+ public void setProcessed(boolean processed) {
+ this.processed = processed;
+ }
+
+ }
+
+ public Entry getEntry() {
+ return entry;
+ }
+
+ public void setEntry(Entry entry) {
+ this.entry = entry;
+ }
+ @Override
+ public ResponseType type() {
+ return ResponseType.SINGLE;
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 76b4ccf8f47..e5deabd2fea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.SingleResponse;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
@@ -149,6 +150,19 @@ public final class ResponseConverter {
return results;
}
+
+ public static SingleResponse getResult(final ClientProtos.MutateRequest request,
+ final ClientProtos.MutateResponse response,
+ final CellScanner cells)
+ throws IOException {
+ SingleResponse singleResponse = new SingleResponse();
+ SingleResponse.Entry entry = new SingleResponse.Entry();
+ entry.setResult(ProtobufUtil.toResult(response.getResult(), cells));
+ entry.setProcessed(response.getProcessed());
+ singleResponse.setEntry(entry);
+ return singleResponse;
+ }
+
/**
* Wrap a throwable to an action result.
*
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index e7366a90989..54552d96235 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -218,7 +218,7 @@ public class TestAsyncProcess {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
}
@Override
- protected RpcRetryingCaller createCaller(
+ protected RpcRetryingCaller createCaller(
CancellableRegionServerCallable callable) {
callsCt.incrementAndGet();
MultiServerCallable callable1 = (MultiServerCallable) callable;
@@ -234,9 +234,9 @@ public class TestAsyncProcess {
}
});
- return new RpcRetryingCallerImpl(100, 10, 9) {
+ return new RpcRetryingCallerImpl(100, 10, 9) {
@Override
- public MultiResponse callWithoutRetries(RetryingCallable callable,
+ public AbstractResponse callWithoutRetries(RetryingCallable callable,
int callTimeout)
throws IOException, RuntimeException {
try {
@@ -252,7 +252,7 @@ public class TestAsyncProcess {
}
}
- static class CallerWithFailure extends RpcRetryingCallerImpl{
+ static class CallerWithFailure extends RpcRetryingCallerImpl{
private final IOException e;
@@ -262,7 +262,7 @@ public class TestAsyncProcess {
}
@Override
- public MultiResponse callWithoutRetries(RetryingCallable callable,
+ public AbstractResponse callWithoutRetries(RetryingCallable callable,
int callTimeout)
throws IOException, RuntimeException {
throw e;
@@ -281,7 +281,7 @@ public class TestAsyncProcess {
}
@Override
- protected RpcRetryingCaller createCaller(
+ protected RpcRetryingCaller createCaller(
CancellableRegionServerCallable callable) {
callsCt.incrementAndGet();
return new CallerWithFailure(ioe);
@@ -332,7 +332,7 @@ public class TestAsyncProcess {
}
@Override
- protected RpcRetryingCaller createCaller(
+ protected RpcRetryingCaller createCaller(
CancellableRegionServerCallable payloadCallable) {
MultiServerCallable callable = (MultiServerCallable) payloadCallable;
final MultiResponse mr = createMultiResponse(
@@ -362,9 +362,9 @@ public class TestAsyncProcess {
replicaCalls.incrementAndGet();
}
- return new RpcRetryingCallerImpl(100, 10, 9) {
+ return new RpcRetryingCallerImpl(100, 10, 9) {
@Override
- public MultiResponse callWithoutRetries(RetryingCallable callable,
+ public MultiResponse callWithoutRetries(RetryingCallable callable,
int callTimeout)
throws IOException, RuntimeException {
long sleep = -1;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index bc94b024b8f..f46562540e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
@@ -1857,6 +1858,33 @@ public class TestFromClientSide {
admin.close();
}
+ @Test
+ public void testDeleteWithFailed() throws Exception {
+ TableName TABLE = TableName.valueOf("testDeleteWithFailed");
+
+ byte [][] ROWS = makeNAscii(ROW, 6);
+ byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+ byte [][] VALUES = makeN(VALUE, 5);
+ long [] ts = {1000, 2000, 3000, 4000, 5000};
+
+ Table ht = TEST_UTIL.createTable(TABLE, FAMILIES, 3);
+
+ Put put = new Put(ROW);
+ put.addColumn(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
+ ht.put(put);
+
+ // delete wrong family
+ Delete delete = new Delete(ROW);
+ delete.addFamily(FAMILIES[1], ts[0]);
+ ht.delete(delete);
+
+ Get get = new Get(ROW);
+ get.addFamily(FAMILIES[0]);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ Result result = ht.get(get);
+ assertTrue(Bytes.equals(result.getValue(FAMILIES[0], QUALIFIER), VALUES[0]));
+ }
+
@Test
public void testDeletes() throws Exception {
TableName TABLE = TableName.valueOf("testDeletes");
@@ -4622,6 +4650,24 @@ public class TestFromClientSide {
assertEquals(ok, true);
}
+ @Test
+ public void testCheckAndDelete() throws IOException {
+ final byte [] value1 = Bytes.toBytes("aaaa");
+
+ Table table = TEST_UTIL.createTable(TableName.valueOf("testCheckAndDelete"),
+ FAMILY);
+
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER, value1);
+ table.put(put);
+
+ Delete delete = new Delete(ROW);
+ delete.addColumns(FAMILY, QUALIFIER);
+
+ boolean ok = table.checkAndDelete(ROW, FAMILY, QUALIFIER, value1, delete);
+ assertEquals(ok, true);
+ }
+
@Test
public void testCheckAndDeleteWithCompareOp() throws IOException {
final byte [] value1 = Bytes.toBytes("aaaa");
From 77b327320a72ca01b35f655c42f8c13f659dff31 Mon Sep 17 00:00:00 2001
From: anoopsamjohn
Date: Tue, 13 Sep 2016 11:43:26 +0530
Subject: [PATCH 3/3] HBASE-16229 Cleaning up size and heapSize calculation.
---
.../apache/hadoop/hbase/util/ClassSize.java | 15 ++
.../hbase/regionserver/AbstractMemStore.java | 49 +++----
.../regionserver/CompactingMemStore.java | 92 ++++++-------
.../regionserver/CompactionPipeline.java | 31 +++--
.../hbase/regionserver/DefaultMemStore.java | 30 ++--
.../hbase/regionserver/ImmutableSegment.java | 84 ++++++------
.../hadoop/hbase/regionserver/MemStore.java | 6 +-
.../hbase/regionserver/MemStoreCompactor.java | 24 ++--
.../hbase/regionserver/MemStoreSnapshot.java | 2 +-
.../hbase/regionserver/MutableSegment.java | 25 ++--
.../hadoop/hbase/regionserver/Segment.java | 103 ++++++--------
.../hbase/regionserver/SegmentFactory.java | 43 +++---
.../apache/hadoop/hbase/io/TestHeapSize.java | 96 ++++++++++++-
.../regionserver/TestCompactingMemStore.java | 22 +--
.../TestCompactingToCellArrayMapMemStore.java | 4 +-
.../TestPerColumnFamilyFlush.java | 49 ++++---
.../TestWalAndCompactingMemStoreFlush.java | 129 ++++++------------
17 files changed, 419 insertions(+), 385 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index ff9dbcbda7c..85a648375a1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -46,6 +46,12 @@ public class ClassSize {
/** Overhead for ArrayList(0) */
public static final int ARRAYLIST;
+ /** Overhead for LinkedList(0) */
+ public static final int LINKEDLIST;
+
+ /** Overhead for a single entry in LinkedList */
+ public static final int LINKEDLIST_ENTRY;
+
/** Overhead for ByteBuffer */
public static final int BYTE_BUFFER;
@@ -100,6 +106,9 @@ public class ClassSize {
/** Overhead for AtomicBoolean */
public static final int ATOMIC_BOOLEAN;
+ /** Overhead for AtomicReference */
+ public static final int ATOMIC_REFERENCE;
+
/** Overhead for CopyOnWriteArraySet */
public static final int COPYONWRITE_ARRAYSET;
@@ -240,6 +249,10 @@ public class ClassSize {
ARRAYLIST = align(OBJECT + REFERENCE + (2 * Bytes.SIZEOF_INT)) + align(ARRAY);
+ LINKEDLIST = align(OBJECT + (2 * Bytes.SIZEOF_INT) + (2 * REFERENCE));
+
+ LINKEDLIST_ENTRY = align(OBJECT + (2 * REFERENCE));
+
//noinspection PointlessArithmeticExpression
BYTE_BUFFER = align(OBJECT + REFERENCE +
(5 * Bytes.SIZEOF_INT) +
@@ -292,6 +305,8 @@ public class ClassSize {
ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN);
+ ATOMIC_REFERENCE = align(OBJECT + REFERENCE);
+
COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE);
COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 419b76a205e..5e9f6321d5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -52,34 +52,29 @@ public abstract class AbstractMemStore implements MemStore {
private final CellComparator comparator;
// active segment absorbs write operations
- private volatile MutableSegment active;
+ protected volatile MutableSegment active;
// Snapshot of memstore. Made for flusher.
- private volatile ImmutableSegment snapshot;
+ protected volatile ImmutableSegment snapshot;
protected volatile long snapshotId;
// Used to track when to flush
private volatile long timeOfOldestEdit;
- public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT +
- (4 * ClassSize.REFERENCE) +
- (2 * Bytes.SIZEOF_LONG));
-
- public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
- (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER +
- ClassSize.CELL_SET + ClassSize.CONCURRENT_SKIPLISTMAP));
+ public final static long FIXED_OVERHEAD = ClassSize
+ .align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
+ public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
protected AbstractMemStore(final Configuration conf, final CellComparator c) {
this.conf = conf;
this.comparator = c;
resetActive();
- this.snapshot = SegmentFactory.instance().createImmutableSegment(c, 0);
+ this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
this.snapshotId = NO_SNAPSHOT_ID;
}
protected void resetActive() {
// Reset heap to not include any keys
- this.active = SegmentFactory.instance().createMutableSegment(conf, comparator, DEEP_OVERHEAD);
+ this.active = SegmentFactory.instance().createMutableSegment(conf, comparator);
this.timeOfOldestEdit = Long.MAX_VALUE;
}
@@ -200,8 +195,7 @@ public abstract class AbstractMemStore implements MemStore {
// create a new snapshot and let the old one go.
Segment oldSnapshot = this.snapshot;
if (!this.snapshot.isEmpty()) {
- this.snapshot = SegmentFactory.instance().createImmutableSegment(
- getComparator(), 0);
+ this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
}
this.snapshotId = NO_SNAPSHOT_ID;
oldSnapshot.close();
@@ -213,12 +207,12 @@ public abstract class AbstractMemStore implements MemStore {
*/
@Override
public long heapSize() {
- return getActive().getSize();
+ return size();
}
@Override
public long getSnapshotSize() {
- return getSnapshot().getSize();
+ return this.snapshot.keySize();
}
@Override
@@ -385,7 +379,7 @@ public abstract class AbstractMemStore implements MemStore {
// so we cant add the new Cell w/o knowing what's there already, but we also
// want to take this chance to delete some cells. So two loops (sad)
- SortedSet ss = getActive().tailSet(firstCell);
+ SortedSet ss = this.active.tailSet(firstCell);
for (Cell cell : ss) {
// if this isnt the row we are interested in, then bail:
if (!CellUtil.matchingColumn(cell, family, qualifier)
@@ -433,38 +427,33 @@ public abstract class AbstractMemStore implements MemStore {
}
}
+ /**
+ * @return The size of the active segment. Means sum of all cell's size.
+ */
protected long keySize() {
- return heapSize() - DEEP_OVERHEAD;
+ return this.active.keySize();
}
protected CellComparator getComparator() {
return comparator;
}
- protected MutableSegment getActive() {
+ @VisibleForTesting
+ MutableSegment getActive() {
return active;
}
- protected ImmutableSegment getSnapshot() {
+ @VisibleForTesting
+ ImmutableSegment getSnapshot() {
return snapshot;
}
- protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) {
- this.snapshot = snapshot;
- return this;
- }
-
- protected void setSnapshotSize(long snapshotSize) {
- getSnapshot().setSize(snapshotSize);
- }
-
/**
* Check whether anything need to be done based on the current active set size
*/
protected abstract void checkActiveSize();
/**
- * Returns an ordered list of segments from most recent to oldest in memstore
* @return an ordered list of segments from most recent to oldest in memstore
*/
protected abstract List getSegments() throws IOException;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 504ddabdab5..177f222af6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,13 +51,6 @@ import org.apache.hadoop.hbase.wal.WAL;
@InterfaceAudience.Private
public class CompactingMemStore extends AbstractMemStore {
- public final static long DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM = ClassSize.align(
- ClassSize.TIMERANGE_TRACKER + ClassSize.CELL_SET + ClassSize.CONCURRENT_SKIPLISTMAP);
-
- public final static long DEEP_OVERHEAD_PER_PIPELINE_CELL_ARRAY_ITEM = ClassSize.align(
- ClassSize.TIMERANGE_TRACKER + ClassSize.TIMERANGE +
- ClassSize.CELL_SET + ClassSize.CELL_ARRAY_MAP);
-
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor";
@@ -75,6 +67,13 @@ public class CompactingMemStore extends AbstractMemStore {
@VisibleForTesting
private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
+ public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
+ + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
+ // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
+ + Bytes.SIZEOF_LONG // inmemoryFlushSize
+ + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
+ + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD;
+
public CompactingMemStore(Configuration conf, CellComparator c,
HStore store, RegionServicesForStores regionServices) throws IOException {
super(conf, c);
@@ -100,28 +99,18 @@ public class CompactingMemStore extends AbstractMemStore {
LOG.info("Setting in-memory flush size threshold to " + inmemoryFlushSize);
}
- public static long getSegmentSize(Segment segment) {
- return segment.keySize();
- }
-
- public static long getSegmentsSize(List extends Segment> list) {
- long res = 0;
- for (Segment segment : list) {
- res += getSegmentSize(segment);
- }
- return res;
- }
-
/**
- * @return Total memory occupied by this MemStore.
- * This is not thread safe and the memstore may be changed while computing its size.
- * It is the responsibility of the caller to make sure this doesn't happen.
+ * @return Total memory occupied by this MemStore. This includes active segment size and heap size
+ * overhead of this memstore but won't include any size occupied by the snapshot. We
+ * assume the snapshot will get cleared soon. This is not thread safe and the memstore may
+ * be changed while computing its size. It is the responsibility of the caller to make
+ * sure this doesn't happen.
*/
@Override
public long size() {
- long res = 0;
- for (Segment item : getSegments()) {
- res += item.getSize();
+ long res = DEEP_OVERHEAD + this.active.size();
+ for (Segment item : pipeline.getSegments()) {
+ res += CompactionPipeline.ENTRY_OVERHEAD + item.size();
}
return res;
}
@@ -131,11 +120,13 @@ public class CompactingMemStore extends AbstractMemStore {
* The store may do any post-flush actions at this point.
* One example is to update the WAL with sequence number that is known only at the store level.
*/
- @Override public void finalizeFlush() {
+ @Override
+ public void finalizeFlush() {
updateLowestUnflushedSequenceIdInWAL(false);
}
- @Override public boolean isSloppy() {
+ @Override
+ public boolean isSloppy() {
return true;
}
@@ -148,10 +139,9 @@ public class CompactingMemStore extends AbstractMemStore {
*/
@Override
public MemStoreSnapshot snapshot() {
- MutableSegment active = getActive();
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
- if (!getSnapshot().isEmpty()) {
+ if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
@@ -161,21 +151,22 @@ public class CompactingMemStore extends AbstractMemStore {
+ getFamilyName());
}
stopCompaction();
- pushActiveToPipeline(active);
+ pushActiveToPipeline(this.active);
snapshotId = EnvironmentEdgeManager.currentTime();
pushTailToSnapshot();
}
- return new MemStoreSnapshot(snapshotId, getSnapshot());
+ return new MemStoreSnapshot(snapshotId, this.snapshot);
}
/**
* On flush, how much memory we will clear.
* @return size of data that is going to be flushed
*/
- @Override public long getFlushableSize() {
- long snapshotSize = getSnapshot().getSize();
- if(snapshotSize == 0) {
- //if snapshot is empty the tail of the pipeline is flushed
+ @Override
+ public long getFlushableSize() {
+ long snapshotSize = getSnapshotSize();
+ if (snapshotSize == 0) {
+ // if snapshot is empty the tail of the pipeline is flushed
snapshotSize = pipeline.getTailSize();
}
return snapshotSize > 0 ? snapshotSize : keySize();
@@ -186,7 +177,7 @@ public class CompactingMemStore extends AbstractMemStore {
long minSequenceId = pipeline.getMinSequenceId();
if(minSequenceId != Long.MAX_VALUE) {
byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
- byte[] familyName = getFamilyNameInByte();
+ byte[] familyName = getFamilyNameInBytes();
WAL WAL = getRegionServices().getWAL();
if (WAL != null) {
WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
@@ -197,10 +188,10 @@ public class CompactingMemStore extends AbstractMemStore {
@Override
public List getSegments() {
List pipelineList = pipeline.getSegments();
- List list = new LinkedList();
- list.add(getActive());
+ List list = new ArrayList(pipelineList.size() + 2);
+ list.add(this.active);
list.addAll(pipelineList);
- list.add(getSnapshot());
+ list.add(this.snapshot);
return list;
}
@@ -235,7 +226,7 @@ public class CompactingMemStore extends AbstractMemStore {
}
public String getFamilyName() {
- return Bytes.toString(getFamilyNameInByte());
+ return Bytes.toString(getFamilyNameInBytes());
}
@Override
@@ -248,12 +239,12 @@ public class CompactingMemStore extends AbstractMemStore {
// The list of elements in pipeline + the active element + the snapshot segment
// TODO : This will change when the snapshot is made of more than one element
List list = new ArrayList(pipelineList.size() + 2);
- list.add(getActive().getScanner(readPt, order + 1));
+ list.add(this.active.getScanner(readPt, order + 1));
for (Segment item : pipelineList) {
list.add(item.getScanner(readPt, order));
order--;
}
- list.add(getSnapshot().getScanner(readPt, order));
+ list.add(this.snapshot.getScanner(readPt, order));
return Collections. singletonList(new MemStoreScanner(getComparator(), list));
}
@@ -291,11 +282,10 @@ public class CompactingMemStore extends AbstractMemStore {
// Phase I: Update the pipeline
getRegionServices().blockUpdates();
try {
- MutableSegment active = getActive();
if (LOG.isDebugEnabled()) {
LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
}
- pushActiveToPipeline(active);
+ pushActiveToPipeline(this.active);
} finally {
getRegionServices().unblockUpdates();
}
@@ -319,7 +309,7 @@ public class CompactingMemStore extends AbstractMemStore {
}
}
- private byte[] getFamilyNameInByte() {
+ private byte[] getFamilyNameInBytes() {
return store.getFamily().getName();
}
@@ -328,7 +318,7 @@ public class CompactingMemStore extends AbstractMemStore {
}
private boolean shouldFlushInMemory() {
- if(getActive().getSize() > inmemoryFlushSize) { // size above flush threshold
+ if (this.active.size() > inmemoryFlushSize) { // size above flush threshold
// the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
// the insert of the active into the compaction pipeline
return (inMemoryFlushInProgress.compareAndSet(false,true));
@@ -350,8 +340,6 @@ public class CompactingMemStore extends AbstractMemStore {
private void pushActiveToPipeline(MutableSegment active) {
if (!active.isEmpty()) {
- long delta = DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM - DEEP_OVERHEAD;
- active.incSize(delta);
pipeline.pushHead(active);
resetActive();
}
@@ -360,9 +348,7 @@ public class CompactingMemStore extends AbstractMemStore {
private void pushTailToSnapshot() {
ImmutableSegment tail = pipeline.pullTail();
if (!tail.isEmpty()) {
- setSnapshot(tail);
- long size = getSegmentSize(tail);
- setSnapshotSize(size);
+ this.snapshot = tail;
}
}
@@ -428,7 +414,7 @@ public class CompactingMemStore extends AbstractMemStore {
// debug method
public void debug() {
- String msg = "active size="+getActive().getSize();
+ String msg = "active size=" + this.active.size();
msg += " threshold="+IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT* inmemoryFlushSize;
msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index e0ba8c3ac0d..6a13f43454a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -24,7 +24,10 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
/**
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
@@ -39,13 +42,17 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public class CompactionPipeline {
private static final Log LOG = LogFactory.getLog(CompactionPipeline.class);
+ public final static long FIXED_OVERHEAD = ClassSize
+ .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+ public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST;
+ public final static long ENTRY_OVERHEAD = ClassSize.LINKEDLIST_ENTRY;
+
private final RegionServicesForStores region;
private LinkedList pipeline;
private long version;
private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance()
- .createImmutableSegment(null,
- CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM);
+ .createImmutableSegment((CellComparator) null);
public CompactionPipeline(RegionServicesForStores region) {
this.region = region;
@@ -105,8 +112,8 @@ public class CompactionPipeline {
}
if (region != null) {
// update the global memstore size counter
- long suffixSize = CompactingMemStore.getSegmentsSize(suffix);
- long newSize = CompactingMemStore.getSegmentSize(segment);
+ long suffixSize = getSegmentsKeySize(suffix);
+ long newSize = segment.keySize();
long delta = suffixSize - newSize;
long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
if (LOG.isDebugEnabled()) {
@@ -117,6 +124,14 @@ public class CompactionPipeline {
return true;
}
+ private static long getSegmentsKeySize(List extends Segment> list) {
+ long res = 0;
+ for (Segment segment : list) {
+ res += segment.keySize();
+ }
+ return res;
+ }
+
/**
* If the caller holds the current version, go over the the pipeline and try to flatten each
* segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based.
@@ -178,20 +193,20 @@ public class CompactionPipeline {
public long getMinSequenceId() {
long minSequenceId = Long.MAX_VALUE;
- if(!isEmpty()) {
+ if (!isEmpty()) {
minSequenceId = pipeline.getLast().getMinSequenceId();
}
return minSequenceId;
}
public long getTailSize() {
- if(isEmpty()) return 0;
- return CompactingMemStore.getSegmentSize(pipeline.peekLast());
+ if (isEmpty()) return 0;
+ return pipeline.peekLast().keySize();
}
private void swapSuffix(LinkedList suffix, ImmutableSegment segment) {
version++;
- for(Segment itemInSuffix : suffix) {
+ for (Segment itemInSuffix : suffix) {
itemInSuffix.close();
}
pipeline.removeAll(suffix);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 50ab06bd2f5..b448b04a328 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -83,20 +83,19 @@ public class DefaultMemStore extends AbstractMemStore {
public MemStoreSnapshot snapshot() {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
- if (!getSnapshot().isEmpty()) {
+ if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
this.snapshotId = EnvironmentEdgeManager.currentTime();
- if (!getActive().isEmpty()) {
+ if (!this.active.isEmpty()) {
ImmutableSegment immutableSegment = SegmentFactory.instance().
- createImmutableSegment(getActive());
- setSnapshot(immutableSegment);
- setSnapshotSize(keySize());
+ createImmutableSegment(this.active);
+ this.snapshot = immutableSegment;
resetActive();
}
}
- return new MemStoreSnapshot(this.snapshotId, getSnapshot());
+ return new MemStoreSnapshot(this.snapshotId, this.snapshot);
}
/**
@@ -106,7 +105,7 @@ public class DefaultMemStore extends AbstractMemStore {
*/
@Override
public long getFlushableSize() {
- long snapshotSize = getSnapshot().getSize();
+ long snapshotSize = getSnapshotSize();
return snapshotSize > 0 ? snapshotSize : keySize();
}
@@ -116,8 +115,8 @@ public class DefaultMemStore extends AbstractMemStore {
*/
public List getScanners(long readPt) throws IOException {
List list = new ArrayList(2);
- list.add(getActive().getScanner(readPt, 1));
- list.add(getSnapshot().getScanner(readPt, 0));
+ list.add(this.active.getScanner(readPt, 1));
+ list.add(this.snapshot.getScanner(readPt, 0));
return Collections. singletonList(
new MemStoreScanner(getComparator(), list));
}
@@ -125,8 +124,8 @@ public class DefaultMemStore extends AbstractMemStore {
@Override
protected List getSegments() throws IOException {
List list = new ArrayList(2);
- list.add(getActive());
- list.add(getSnapshot());
+ list.add(this.active);
+ list.add(this.snapshot);
return list;
}
@@ -137,19 +136,16 @@ public class DefaultMemStore extends AbstractMemStore {
*/
Cell getNextRow(final Cell cell) {
return getLowest(
- getNextRow(cell, getActive().getCellSet()),
- getNextRow(cell, getSnapshot().getCellSet()));
+ getNextRow(cell, this.active.getCellSet()),
+ getNextRow(cell, this.snapshot.getCellSet()));
}
@Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
}
- /**
- * @return Total memory occupied by this MemStore.
- */
@Override
public long size() {
- return heapSize();
+ return this.active.size() + DEEP_OVERHEAD;
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 28f14d5c2df..12b79163494 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.client.Scan;
@@ -37,11 +38,12 @@ import java.io.IOException;
*/
@InterfaceAudience.Private
public class ImmutableSegment extends Segment {
- /**
- * This is an immutable segment so use the read-only TimeRange rather than the heavy-weight
- * TimeRangeTracker with all its synchronization when doing time range stuff.
- */
- private final TimeRange timeRange;
+
+ private static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
+ + (2 * ClassSize.REFERENCE) // Refs to timeRange and type
+ + ClassSize.TIMERANGE;
+ public static final long DEEP_OVERHEAD_CSLM = DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
+ public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP;
/**
* Types of ImmutableSegment
@@ -51,6 +53,12 @@ public class ImmutableSegment extends Segment {
ARRAY_MAP_BASED,
}
+ /**
+ * This is an immutable segment so use the read-only TimeRange rather than the heavy-weight
+ * TimeRangeTracker with all its synchronization when doing time range stuff.
+ */
+ private final TimeRange timeRange;
+
private Type type = Type.SKIPLIST_MAP_BASED;
// whether it is based on CellFlatMap or ConcurrentSkipListMap
@@ -66,9 +74,8 @@ public class ImmutableSegment extends Segment {
*/
protected ImmutableSegment(Segment segment) {
super(segment);
- type = Type.SKIPLIST_MAP_BASED;
- TimeRangeTracker trt = getTimeRangeTracker();
- this.timeRange = trt == null? null: trt.toTimeRange();
+ this.type = Type.SKIPLIST_MAP_BASED;
+ this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
/**------------------------------------------------------------------------
@@ -80,20 +87,14 @@ public class ImmutableSegment extends Segment {
*/
protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, Type type) {
-
- super(null, // initiailize the CellSet with NULL
- comparator, memStoreLAB,
- // initial size of segment metadata (the data per cell is added in createCellArrayMapSet)
- CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_CELL_ARRAY_ITEM,
- ClassSize.CELL_ARRAY_MAP_ENTRY);
-
+ super(null, // initiailize the CellSet with NULL
+ comparator, memStoreLAB);
+ this.type = type;
// build the true CellSet based on CellArrayMap
CellSet cs = createCellArrayMapSet(numOfCells, iterator);
this.setCellSet(null, cs); // update the CellSet of the new Segment
- this.type = type;
- TimeRangeTracker trt = getTimeRangeTracker();
- this.timeRange = trt == null? null: trt.toTimeRange();
+ this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
/**------------------------------------------------------------------------
@@ -101,15 +102,11 @@ public class ImmutableSegment extends Segment {
* list of older ImmutableSegments.
* The given iterator returns the Cells that "survived" the compaction.
*/
- protected ImmutableSegment(
- CellComparator comparator, MemStoreCompactorIterator iterator, MemStoreLAB memStoreLAB) {
-
- super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
- comparator, memStoreLAB,
- // initial size of segment metadata (the data per cell is added in internalAdd)
- CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM,
- ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
-
+ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator,
+ MemStoreLAB memStoreLAB) {
+ super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
+ comparator, memStoreLAB);
+ type = Type.SKIPLIST_MAP_BASED;
while (iterator.hasNext()) {
Cell c = iterator.next();
// The scanner is doing all the elimination logic
@@ -118,9 +115,7 @@ public class ImmutableSegment extends Segment {
boolean usedMSLAB = (newKV != c);
internalAdd(newKV, usedMSLAB); //
}
- type = Type.SKIPLIST_MAP_BASED;
- TimeRangeTracker trt = getTimeRangeTracker();
- this.timeRange = trt == null? null: trt.toTimeRange();
+ this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
///////////////////// PUBLIC METHODS /////////////////////
@@ -144,14 +139,16 @@ public class ImmutableSegment extends Segment {
return this.timeRange.getMin();
}
+
@Override
- public long keySize() {
- switch (type){
+ public long size() {
+ switch (this.type) {
case SKIPLIST_MAP_BASED:
- return size.get() - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM;
+ return keySize() + DEEP_OVERHEAD_CSLM;
case ARRAY_MAP_BASED:
- return size.get() - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_CELL_ARRAY_ITEM;
- default: throw new IllegalStateException();
+ return keySize() + DEEP_OVERHEAD_CAM;
+ default:
+ throw new RuntimeException("Unknown type " + type);
}
}
@@ -171,9 +168,6 @@ public class ImmutableSegment extends Segment {
CellSet oldCellSet = getCellSet();
int numOfCells = getCellsCount();
- // each Cell is now represented in CellArrayMap
- constantCellMetaDataSize = ClassSize.CELL_ARRAY_MAP_ENTRY;
-
// build the new (CellSet CellArrayMap based)
CellSet newCellSet = recreateCellArrayMapSet(numOfCells);
type = Type.ARRAY_MAP_BASED;
@@ -214,6 +208,19 @@ public class ImmutableSegment extends Segment {
return new CellSet(cam);
}
+ protected long heapSizeChange(Cell cell, boolean succ) {
+ if (succ) {
+ switch (this.type) {
+ case SKIPLIST_MAP_BASED:
+ return ClassSize
+ .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
+ case ARRAY_MAP_BASED:
+ return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
+ }
+ }
+ return 0;
+ }
+
/*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from current ConcurrentSkipListMap based CellSet
// (without compacting iterator)
@@ -239,5 +246,4 @@ public class ImmutableSegment extends Segment {
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false);
return new CellSet(cam);
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 52ae6a391a9..d52b8638783 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -126,7 +126,11 @@ public interface MemStore extends HeapSize {
List getScanners(long readPt) throws IOException;
/**
- * @return Total memory occupied by this MemStore.
+ * @return Total memory occupied by this MemStore. This includes active segment size and heap size
+ * overhead of this memstore but won't include any size occupied by the snapshot. We
+ * assume the snapshot will get cleared soon. This is not thread safe and the memstore may
+ * be changed while computing its size. It is the responsibility of the caller to make
+ * sure this doesn't happen.
*/
long size();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 470dc9c8daa..714ffe39cb2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -22,6 +22,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +40,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
* therefore no special synchronization is required.
*/
@InterfaceAudience.Private
-class MemStoreCompactor {
+public class MemStoreCompactor {
+
+ public static final long DEEP_OVERHEAD = ClassSize
+ .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + Bytes.SIZEOF_DOUBLE
+ + ClassSize.ATOMIC_BOOLEAN);
// Option for external guidance whether flattening is allowed
static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten";
@@ -59,6 +65,15 @@ class MemStoreCompactor {
static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false;
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
+
+ /**
+ * Types of Compaction
+ */
+ private enum Type {
+ COMPACT_TO_SKIPLIST_MAP,
+ COMPACT_TO_ARRAY_MAP
+ }
+
private CompactingMemStore compactingMemStore;
// a static version of the segment list from the pipeline
@@ -73,13 +88,6 @@ class MemStoreCompactor {
double fraction = 0.8;
int immutCellsNum = 0; // number of immutable for compaction cells
- /**
- * Types of Compaction
- */
- private enum Type {
- COMPACT_TO_SKIPLIST_MAP,
- COMPACT_TO_ARRAY_MAP
- }
private Type type = Type.COMPACT_TO_ARRAY_MAP;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
index f64979f9935..1bb45118a28 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
@@ -36,7 +36,7 @@ public class MemStoreSnapshot {
public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
this.id = id;
this.cellsCount = snapshot.getCellsCount();
- this.size = snapshot.getSize();
+ this.size = snapshot.keySize();
this.timeRangeTracker = snapshot.getTimeRangeTracker();
this.scanner = snapshot.getKeyValueScanner();
this.tagsPresent = snapshot.isTagsPresent();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
index 1cac7fdd32a..3fb97239e02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -21,17 +21,21 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.ClassSize;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* A mutable segment in memstore, specifically the active segment.
*/
@InterfaceAudience.Private
public class MutableSegment extends Segment {
- protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB,
- long size) {
- super(cellSet, comparator, memStoreLAB, size, ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+
+ public final static long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
+
+ protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
+ super(cellSet, comparator, memStoreLAB);
}
/**
@@ -44,29 +48,28 @@ public class MutableSegment extends Segment {
return internalAdd(cell, mslabUsed);
}
- //methods for test
-
/**
* Returns the first cell in the segment
* @return the first cell in the segment
*/
+ @VisibleForTesting
Cell first() {
return this.getCellSet().first();
}
@Override
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
- return (getTimeRangeTracker().includesTimeRange(scan.getTimeRange())
- && (getTimeRangeTracker().getMax() >= oldestUnexpiredTS));
+ return (this.timeRangeTracker.includesTimeRange(scan.getTimeRange())
+ && (this.timeRangeTracker.getMax() >= oldestUnexpiredTS));
}
@Override
public long getMinTimestamp() {
- return getTimeRangeTracker().getMin();
+ return this.timeRangeTracker.getMin();
}
@Override
- public long keySize() {
- return size.get() - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM;
+ public long size() {
+ return keySize() + DEEP_OVERHEAD;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index b4d1d61ad28..01f3da9776d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.ByteRange;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import com.google.common.annotations.VisibleForTesting;
@@ -47,28 +47,31 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public abstract class Segment {
- private static final Log LOG = LogFactory.getLog(Segment.class);
+ final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ + 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, size, timeRangeTracker
+ + Bytes.SIZEOF_LONG // minSequenceId
+ + Bytes.SIZEOF_BOOLEAN); // tagsPresent
+ public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE
+ + ClassSize.CELL_SET + ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER;
+
private AtomicReference cellSet= new AtomicReference();
private final CellComparator comparator;
private long minSequenceId;
- private volatile MemStoreLAB memStoreLAB;
- /* The size includes everything allocated for this segment,
- * use keySize() to get only size of the cells */
+ private MemStoreLAB memStoreLAB;
+ // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not
+ // including the heap overhead of this class.
protected final AtomicLong size;
+ protected final TimeRangeTracker timeRangeTracker;
protected volatile boolean tagsPresent;
- private final TimeRangeTracker timeRangeTracker;
- protected long constantCellMetaDataSize;
- protected Segment(
- CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, long size,
- long constantCellSize) {
+ // This constructor is used to create empty Segments.
+ protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
this.cellSet.set(cellSet);
this.comparator = comparator;
this.minSequenceId = Long.MAX_VALUE;
this.memStoreLAB = memStoreLAB;
- this.size = new AtomicLong(size);
+ this.size = new AtomicLong(0);
this.tagsPresent = false;
- this.constantCellMetaDataSize = constantCellSize;
this.timeRangeTracker = new TimeRangeTracker();
}
@@ -77,9 +80,8 @@ public abstract class Segment {
this.comparator = segment.getComparator();
this.minSequenceId = segment.getMinSequenceId();
this.memStoreLAB = segment.getMemStoreLAB();
- this.size = new AtomicLong(segment.getSize());
+ this.size = new AtomicLong(segment.keySize());
this.tagsPresent = segment.isTagsPresent();
- this.constantCellMetaDataSize = segment.getConstantCellMetaDataSize();
this.timeRangeTracker = segment.getTimeRangeTracker();
}
@@ -100,7 +102,6 @@ public abstract class Segment {
}
/**
- * Returns whether the segment has any cells
* @return whether the segment has any cells
*/
public boolean isEmpty() {
@@ -108,7 +109,6 @@ public abstract class Segment {
}
/**
- * Returns number of cells in segment
* @return number of cells in segment
*/
public int getCellsCount() {
@@ -116,7 +116,6 @@ public abstract class Segment {
}
/**
- * Returns the first cell in the segment that has equal or greater key than the given cell
* @return the first cell in the segment that has equal or greater key than the given cell
*/
public Cell getFirstAfter(Cell cell) {
@@ -131,9 +130,8 @@ public abstract class Segment {
* Closing a segment before it is being discarded
*/
public void close() {
- MemStoreLAB mslab = getMemStoreLAB();
- if(mslab != null) {
- mslab.close();
+ if (this.memStoreLAB != null) {
+ this.memStoreLAB.close();
}
// do not set MSLab to null as scanners may still be reading the data here and need to decrease
// the counter when they finish
@@ -145,12 +143,12 @@ public abstract class Segment {
* @return either the given cell or its clone
*/
public Cell maybeCloneWithAllocator(Cell cell) {
- if (getMemStoreLAB() == null) {
+ if (this.memStoreLAB == null) {
return cell;
}
int len = getCellLength(cell);
- ByteRange alloc = getMemStoreLAB().allocateBytes(len);
+ ByteRange alloc = this.memStoreLAB.allocateBytes(len);
if (alloc == null) {
// The allocation was too large, allocator decided
// not to do anything with it.
@@ -180,27 +178,17 @@ public abstract class Segment {
}
public void incScannerCount() {
- if(getMemStoreLAB() != null) {
- getMemStoreLAB().incScannerCount();
+ if (this.memStoreLAB != null) {
+ this.memStoreLAB.incScannerCount();
}
}
public void decScannerCount() {
- if(getMemStoreLAB() != null) {
- getMemStoreLAB().decScannerCount();
+ if (this.memStoreLAB != null) {
+ this.memStoreLAB.decScannerCount();
}
}
- /**
- * Setting the heap size of the segment - used to account for different class overheads
- * @return this object
- */
-
- public Segment setSize(long size) {
- this.size.set(size);
- return this;
- }
-
/**
* Setting the CellSet of the segment - used only for flat immutable segment for setting
* immutable CellSet after its creation in immutable segment constructor
@@ -212,22 +200,23 @@ public abstract class Segment {
return this;
}
- /* return only cell's heap size */
- public abstract long keySize();
+ /**
+ * @return Sum of all cell's size.
+ */
+ public long keySize() {
+ return this.size.get();
+ }
/**
- * Returns the heap size of the segment
* @return the heap size of the segment
*/
- public long getSize() {
- return size.get();
- }
+ public abstract long size();
/**
* Updates the heap size counter of the segment by the given delta
*/
public void incSize(long delta) {
- size.addAndGet(delta);
+ this.size.addAndGet(delta);
}
public long getMinSequenceId() {
@@ -260,7 +249,6 @@ public abstract class Segment {
}
/**
- * Returns a set of all cells in the segment
* @return a set of all cells in the segment
*/
protected CellSet getCellSet() {
@@ -302,6 +290,11 @@ public abstract class Segment {
return s;
}
+ protected long heapSizeChange(Cell cell, boolean succ) {
+ return succ ? ClassSize
+ .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell)) : 0;
+ }
+
/**
* Returns a subset of the segment cell set, which starts with the given cell
* @param firstCell a cell in the segment
@@ -312,7 +305,7 @@ public abstract class Segment {
}
@VisibleForTesting
- public MemStoreLAB getMemStoreLAB() {
+ MemStoreLAB getMemStoreLAB() {
return memStoreLAB;
}
@@ -326,29 +319,13 @@ public abstract class Segment {
}
}
- /*
- * Calculate how the MemStore size has changed. Includes overhead of the
- * backing Map.
- * @param cell
- * @param notPresent True if the cell was NOT present in the set.
- * @return change in size
- */
- protected long heapSizeChange(final Cell cell, final boolean notPresent){
- return
- notPresent ?
- ClassSize.align(constantCellMetaDataSize + CellUtil.estimatedHeapSizeOf(cell)) : 0;
- }
-
- public long getConstantCellMetaDataSize() {
- return this.constantCellMetaDataSize;
- }
-
@Override
public String toString() {
String res = "Store segment of type "+this.getClass().getName()+"; ";
res += "isEmpty "+(isEmpty()?"yes":"no")+"; ";
res += "cellCount "+getCellsCount()+"; ";
- res += "size "+getSize()+"; ";
+ res += "cellsSize "+keySize()+"; ";
+ res += "heapSize "+size()+"; ";
res += "Min ts "+getMinTimestamp()+"; ";
return res;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index 6351f1358b9..510ebbdea71 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -39,6 +39,7 @@ public final class SegmentFactory {
private SegmentFactory() {}
private static SegmentFactory instance = new SegmentFactory();
+
public static SegmentFactory instance() {
return instance;
}
@@ -46,47 +47,43 @@ public final class SegmentFactory {
// create skip-list-based (non-flat) immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegment(final Configuration conf,
final CellComparator comparator, MemStoreCompactorIterator iterator) {
- MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
- return
- new ImmutableSegment(comparator, iterator, memStoreLAB);
+ return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf));
+ }
+
+ // create new flat immutable segment from compacting old immutable segment
+ public ImmutableSegment createImmutableSegment(final Configuration conf,
+ final CellComparator comparator, MemStoreCompactorIterator iterator, int numOfCells,
+ ImmutableSegment.Type segmentType) throws IOException {
+ Preconditions.checkArgument(segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED,
+ "wrong immutable segment type");
+ return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf), numOfCells,
+ segmentType);
}
// create empty immutable segment
- public ImmutableSegment createImmutableSegment(CellComparator comparator, long size) {
- MutableSegment segment = generateMutableSegment(null, comparator, null, size);
+ public ImmutableSegment createImmutableSegment(CellComparator comparator) {
+ MutableSegment segment = generateMutableSegment(null, comparator, null);
return createImmutableSegment(segment);
}
- // create immutable segment from mutable
+ // create immutable segment from mutable segment
public ImmutableSegment createImmutableSegment(MutableSegment segment) {
return new ImmutableSegment(segment);
}
// create mutable segment
- public MutableSegment createMutableSegment(final Configuration conf,
- CellComparator comparator, long size) {
+ public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) {
MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
- return generateMutableSegment(conf, comparator, memStoreLAB, size);
- }
-
- // create new flat immutable segment from compacting old immutable segment
- public ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator,
- MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType)
- throws IOException {
- Preconditions.checkArgument(
- segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, "wrong immutable segment type");
- MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
- return
- new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType);
+ return generateMutableSegment(conf, comparator, memStoreLAB);
}
//****** private methods to instantiate concrete store segments **********//
- private MutableSegment generateMutableSegment(
- final Configuration conf, CellComparator comparator, MemStoreLAB memStoreLAB, long size) {
+ private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator,
+ MemStoreLAB memStoreLAB) {
// TBD use configuration to set type of segment
CellSet set = new CellSet(comparator);
- return new MutableSegment(set, comparator, memStoreLAB, size);
+ return new MutableSegment(set, comparator, memStoreLAB);
}
private MemStoreLAB getMemStoreLAB(Configuration conf) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index e7d6661d31a..6e8f831e744 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -40,6 +40,7 @@ import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -49,6 +50,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.junit.Assert.assertEquals;
@@ -310,19 +312,107 @@ public class TestHeapSize {
// DefaultMemStore Deep Overhead
actual = DefaultMemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
+ if (expected != actual) {
+ ClassSize.estimateBase(cl, true);
+ assertEquals(expected, actual);
+ }
+
+ // CompactingMemStore Deep Overhead
+ cl = CompactingMemStore.class;
+ actual = CompactingMemStore.DEEP_OVERHEAD;
+ expected = ClassSize.estimateBase(cl, false);
+ expected += ClassSize.estimateBase(AtomicBoolean.class, false);
+ expected += ClassSize.estimateBase(AtomicBoolean.class, false);
+ expected += ClassSize.estimateBase(CompactionPipeline.class, false);
+ expected += ClassSize.estimateBase(LinkedList.class, false);
+ expected += ClassSize.estimateBase(MemStoreCompactor.class, false);
+ expected += ClassSize.estimateBase(AtomicBoolean.class, false);
+ if (expected != actual) {
+ ClassSize.estimateBase(cl, true);
+ ClassSize.estimateBase(AtomicBoolean.class, true);
+ ClassSize.estimateBase(AtomicBoolean.class, true);
+ ClassSize.estimateBase(CompactionPipeline.class, true);
+ ClassSize.estimateBase(LinkedList.class, true);
+ ClassSize.estimateBase(MemStoreCompactor.class, true);
+ ClassSize.estimateBase(AtomicBoolean.class, true);
+ assertEquals(expected, actual);
+ }
+
+ // Segment Deep overhead
+ cl = Segment.class;
+ actual = Segment.DEEP_OVERHEAD;
+ expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
+ expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
- expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
- if(expected != actual) {
+ if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
+ ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
- ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
assertEquals(expected, actual);
}
+ // MutableSegment Deep overhead
+ cl = MutableSegment.class;
+ actual = MutableSegment.DEEP_OVERHEAD;
+ expected = ClassSize.estimateBase(cl, false);
+ expected += ClassSize.estimateBase(AtomicLong.class, false);
+ expected += ClassSize.estimateBase(AtomicReference.class, false);
+ expected += ClassSize.estimateBase(CellSet.class, false);
+ expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
+ expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
+ if (expected != actual) {
+ ClassSize.estimateBase(cl, true);
+ ClassSize.estimateBase(AtomicLong.class, true);
+ ClassSize.estimateBase(AtomicReference.class, true);
+ ClassSize.estimateBase(CellSet.class, true);
+ ClassSize.estimateBase(TimeRangeTracker.class, true);
+ ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
+ assertEquals(expected, actual);
+ }
+
+ // ImmutableSegment Deep overhead
+ cl = ImmutableSegment.class;
+ actual = ImmutableSegment.DEEP_OVERHEAD_CSLM;
+ expected = ClassSize.estimateBase(cl, false);
+ expected += ClassSize.estimateBase(AtomicLong.class, false);
+ expected += ClassSize.estimateBase(AtomicReference.class, false);
+ expected += ClassSize.estimateBase(CellSet.class, false);
+ expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
+ expected += ClassSize.estimateBase(TimeRange.class, false);
+ expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
+ if (expected != actual) {
+ ClassSize.estimateBase(cl, true);
+ ClassSize.estimateBase(AtomicLong.class, true);
+ ClassSize.estimateBase(AtomicReference.class, true);
+ ClassSize.estimateBase(CellSet.class, true);
+ ClassSize.estimateBase(TimeRangeTracker.class, true);
+ ClassSize.estimateBase(TimeRange.class, true);
+ ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
+ assertEquals(expected, actual);
+ }
+ actual = ImmutableSegment.DEEP_OVERHEAD_CAM;
+ expected = ClassSize.estimateBase(cl, false);
+ expected += ClassSize.estimateBase(AtomicLong.class, false);
+ expected += ClassSize.estimateBase(AtomicReference.class, false);
+ expected += ClassSize.estimateBase(CellSet.class, false);
+ expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
+ expected += ClassSize.estimateBase(TimeRange.class, false);
+ expected += ClassSize.estimateBase(CellArrayMap.class, false);
+ if (expected != actual) {
+ ClassSize.estimateBase(cl, true);
+ ClassSize.estimateBase(AtomicLong.class, true);
+ ClassSize.estimateBase(AtomicReference.class, true);
+ ClassSize.estimateBase(CellSet.class, true);
+ ClassSize.estimateBase(TimeRangeTracker.class, true);
+ ClassSize.estimateBase(TimeRange.class, true);
+ ClassSize.estimateBase(CellArrayMap.class, true);
+ assertEquals(expected, actual);
+ }
+
// Store Overhead
cl = HStore.class;
actual = HStore.FIXED_OVERHEAD;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index db0205e3a1b..211a6d89d08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -377,7 +377,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
private long runSnapshot(final AbstractMemStore hmc, boolean useForce)
throws IOException {
// Save off old state.
- long oldHistorySize = hmc.getSnapshot().getSize();
+ long oldHistorySize = hmc.getSnapshot().keySize();
long prevTimeStamp = hmc.timeOfOldestEdit();
hmc.snapshot();
@@ -547,10 +547,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf1, 3, val));
assertEquals(3, memstore.getActive().getCellsCount());
- while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
- Threads.sleep(10);
- }
-
assertTrue(chunkPool.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
@@ -597,9 +593,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
long size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
- while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
- Threads.sleep(10);
- }
assertEquals(0, memstore.getSnapshot().getCellsCount());
assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
@@ -625,9 +618,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
long size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
- while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
- Threads.sleep(1000);
- }
int counter = 0;
for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount();
@@ -641,9 +631,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
- while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
- Threads.sleep(10);
- }
assertEquals(0, memstore.getSnapshot().getCellsCount());
assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize());
@@ -672,9 +659,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
String tstStr = "\n\nFlushable size after first flush in memory:" + size
+ ". Is MemmStore in compaction?:" + ((CompactingMemStore)memstore).isMemStoreFlushingInMemory();
- while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
- Threads.sleep(10);
- }
assertEquals(0, memstore.getSnapshot().getCellsCount());
assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
@@ -719,7 +703,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");
- long size = hmc.getActive().getSize();
+ long size = hmc.getActive().keySize();
for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts
@@ -729,7 +713,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
hmc.add(kv);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
}
- regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().getSize() - size);
+ regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().keySize() - size);
}
private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index 1933343b001..fefe2c113c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -333,7 +333,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");
- long size = hmc.getActive().getSize();//
+ long size = hmc.getActive().size();//
for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts
@@ -343,7 +343,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
hmc.add(kv);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
}
- regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().getSize() - size);//
+ regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().size() - size);//
}
private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index 1615b993bc9..6bfaa592f08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -172,8 +172,9 @@ public class TestPerColumnFamilyFlush {
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3.
- assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
- + cf2MemstoreSize + cf3MemstoreSize);
+ assertEquals(
+ totalMemstoreSize + (3 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
+ cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize);
// Flush!
region.flush(false);
@@ -192,7 +193,7 @@ public class TestPerColumnFamilyFlush {
// We should have cleared out only CF1, since we chose the flush thresholds
// and number of puts accordingly.
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
// Nothing should have happened to CF2, ...
assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
// ... or CF3
@@ -201,8 +202,9 @@ public class TestPerColumnFamilyFlush {
// LSN in the memstore of CF2.
assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
// Of course, this should hold too.
- assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
- + cf3MemstoreSize);
+ assertEquals(
+ totalMemstoreSize + (2 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
+ cf2MemstoreSize + cf3MemstoreSize);
// Now add more puts (mostly for CF2), so that we only flush CF2 this time.
for (int i = 1200; i < 2400; i++) {
@@ -229,11 +231,12 @@ public class TestPerColumnFamilyFlush {
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// CF1 and CF2, both should be absent.
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize);
// CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
- assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
+ assertEquals(totalMemstoreSize + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
+ cf3MemstoreSize);
assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
// What happens when we hit the memstore limit, but we are not able to find
@@ -296,8 +299,9 @@ public class TestPerColumnFamilyFlush {
// The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3.
- assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
- + cf2MemstoreSize + cf3MemstoreSize);
+ assertEquals(
+ totalMemstoreSize + (3 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
+ cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize);
// Flush!
region.flush(false);
@@ -310,9 +314,9 @@ public class TestPerColumnFamilyFlush {
region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// Everything should have been cleared
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize);
assertEquals(0, totalMemstoreSize);
assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
HBaseTestingUtility.closeRegionAndWAL(region);
@@ -379,12 +383,13 @@ public class TestPerColumnFamilyFlush {
cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
// CF1 Should have been flushed
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
// CF2 and CF3 shouldn't have been flushed.
assertTrue(cf2MemstoreSize > 0);
assertTrue(cf3MemstoreSize > 0);
- assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
- + cf3MemstoreSize);
+ assertEquals(
+ totalMemstoreSize + (2 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
+ cf2MemstoreSize + cf3MemstoreSize);
// Wait for the RS report to go across to the master, so that the master
// is aware of which sequence ids have been flushed, before we kill the RS.
@@ -521,12 +526,12 @@ public class TestPerColumnFamilyFlush {
});
LOG.info("Finished waiting on flush after too many WALs...");
// Individual families should have been flushed.
- assertEquals(DefaultMemStore.DEEP_OVERHEAD,
- desiredRegion.getStore(FAMILY1).getMemStoreSize());
- assertEquals(DefaultMemStore.DEEP_OVERHEAD,
- desiredRegion.getStore(FAMILY2).getMemStoreSize());
- assertEquals(DefaultMemStore.DEEP_OVERHEAD,
- desiredRegion.getStore(FAMILY3).getMemStoreSize());
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ desiredRegion.getStore(FAMILY1).getMemStoreSize());
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ desiredRegion.getStore(FAMILY2).getMemStoreSize());
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ desiredRegion.getStore(FAMILY3).getMemStoreSize());
// let WAL cleanOldLogs
assertNull(getWAL(desiredRegion).rollWriter(true));
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index a6c791243f8..74826b0b1b7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -19,10 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Arrays;
-import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -31,19 +28,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -59,7 +51,6 @@ import static org.junit.Assert.assertTrue;
@Category({ RegionServerTests.class, LargeTests.class })
public class TestWalAndCompactingMemStoreFlush {
- private static final Log LOG = LogFactory.getLog(TestWalAndCompactingMemStoreFlush.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush",
@@ -201,12 +192,13 @@ public class TestWalAndCompactingMemStoreFlush {
// memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize="+totalMemstoreSize +
" DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
- " DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
- +
+ " CompactingMemStore.DEEP_OVERHEAD="+CompactingMemStore.DEEP_OVERHEAD +
" cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
- assertEquals(msg,totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD,
+ assertEquals(msg,
+ totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)
+ + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
// Flush!!!!!!!!!!!!!!!!!!!!!!
@@ -220,11 +212,6 @@ public class TestWalAndCompactingMemStoreFlush {
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
region.flush(false);
- // CF3 should be compacted so wait here to be sure the compaction is done
- while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore())
- .isMemStoreFlushingInMemory())
- Threads.sleep(10);
-
// Recalculate everything
long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
@@ -239,8 +226,6 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD
+ ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD
- + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" + CompactingMemStore
- .DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+ "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
+ "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
+ ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
@@ -249,12 +234,13 @@ public class TestWalAndCompactingMemStoreFlush {
assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI);
// CF2 should become empty
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf2MemstoreSizePhaseII);
// verify that CF3 was flushed to memory and was compacted (this is approximation check)
- assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD +
- CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM >
- cf3MemstoreSizePhaseII);
+ assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD
+ + ImmutableSegment.DEEP_OVERHEAD_CAM
+ + CompactionPipeline.ENTRY_OVERHEAD > cf3MemstoreSizePhaseII);
// CF3 was compacted and flattened!
assertTrue("\n<<< Size of CF3 in phase I - " + cf3MemstoreSizePhaseI
@@ -315,7 +301,8 @@ public class TestWalAndCompactingMemStoreFlush {
// CF1's pipeline component (inserted before first flush) should be flushed to disk
// CF2 should be flushed to disk
assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV);
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf2MemstoreSizePhaseIV);
// CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
@@ -340,12 +327,16 @@ public class TestWalAndCompactingMemStoreFlush {
long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
- assertTrue(DefaultMemStore.DEEP_OVERHEAD < cf1MemstoreSizePhaseV);
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseV);
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV);
+ assertTrue(
+ CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD < cf1MemstoreSizePhaseV);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf2MemstoreSizePhaseV);
+ assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf3MemstoreSizePhaseV);
region.flush(true); // flush once again in order to be sure that everything is empty
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, region.getStore(FAMILY1).getMemStoreSize());
+ assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ region.getStore(FAMILY1).getMemStoreSize());
// What happens when we hit the memstore limit, but we are not able to find
// any Column Family above the threshold?
@@ -453,12 +444,12 @@ public class TestWalAndCompactingMemStoreFlush {
// memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize="+totalMemstoreSize +
" DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
- " DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
- +
" cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
- assertEquals(msg,totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD,
+ assertEquals(msg,
+ totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)
+ + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
// Flush!!!!!!!!!!!!!!!!!!!!!!
@@ -472,11 +463,6 @@ public class TestWalAndCompactingMemStoreFlush {
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
region.flush(false);
- // CF3 should be compacted so wait here to be sure the compaction is done
- while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore())
- .isMemStoreFlushingInMemory())
- Threads.sleep(10);
-
// Recalculate everything
long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
@@ -491,24 +477,23 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD
+ ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD
- + ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" + CompactingMemStore
- .DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+ "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
+ "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
+ ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
// CF1 was flushed to memory, but there is nothing to compact, should
// remain the same size plus renewed empty skip-list
- assertEquals(s, cf1MemstoreSizePhaseII,
- cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM);
+ assertEquals(s, cf1MemstoreSizePhaseII, cf1MemstoreSizePhaseI
+ + ImmutableSegment.DEEP_OVERHEAD_CAM + CompactionPipeline.ENTRY_OVERHEAD);
// CF2 should become empty
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf2MemstoreSizePhaseII);
// verify that CF3 was flushed to memory and was compacted (this is approximation check)
- assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD +
- CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM >
- cf3MemstoreSizePhaseII);
+ assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD
+ + ImmutableSegment.DEEP_OVERHEAD_CAM
+ + CompactionPipeline.ENTRY_OVERHEAD > cf3MemstoreSizePhaseII);
assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII);
@@ -564,9 +549,10 @@ public class TestWalAndCompactingMemStoreFlush {
// CF1's pipeline component (inserted before first flush) should be flushed to disk
// CF2 should be flushed to disk
- assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + DefaultMemStore.DEEP_OVERHEAD,
- cf1MemstoreSizePhaseIV);
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV);
+ assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD
+ + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseIV);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf2MemstoreSizePhaseIV);
// CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
@@ -590,9 +576,12 @@ public class TestWalAndCompactingMemStoreFlush {
long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSizePhaseV);
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseV);
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV);
+ assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf1MemstoreSizePhaseV);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf2MemstoreSizePhaseV);
+ assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf3MemstoreSizePhaseV);
// Because there is nothing in any memstore the WAL's LSN should be -1
assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM);
@@ -672,24 +661,17 @@ public class TestWalAndCompactingMemStoreFlush {
// memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize="+totalMemstoreSize +
" DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
- " DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
- +
" cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
- assertEquals(msg, totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD,
+ assertEquals(msg,
+ totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)
+ + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
// Flush!
((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory();
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
- // CF1 and CF3 should be compacted so wait here to be sure the compaction is done
- while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore())
- .isMemStoreFlushingInMemory())
- Threads.sleep(10);
- while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore())
- .isMemStoreFlushingInMemory())
- Threads.sleep(10);
region.flush(false);
long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
@@ -701,7 +683,8 @@ public class TestWalAndCompactingMemStoreFlush {
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
// CF2 should have been cleared
- assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII);
+ assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
+ cf2MemstoreSizePhaseII);
String s = "\n\n----------------------------------\n"
+ "Upon initial insert and flush, LSN of CF1 is:"
@@ -739,13 +722,6 @@ public class TestWalAndCompactingMemStoreFlush {
// Flush!
((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory();
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
- // CF1 and CF3 should be compacted so wait here to be sure the compaction is done
- while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore())
- .isMemStoreFlushingInMemory())
- Threads.sleep(10);
- while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore())
- .isMemStoreFlushingInMemory())
- Threads.sleep(10);
region.flush(false);
long smallestSeqInRegionCurrentMemstorePhaseIV =
@@ -768,24 +744,7 @@ public class TestWalAndCompactingMemStoreFlush {
HBaseTestingUtility.closeRegionAndWAL(region);
}
- // Find the (first) region which has the specified name.
- private static Pair getRegionWithName(TableName tableName) {
- MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
- List rsts = cluster.getRegionServerThreads();
- for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
- HRegionServer hrs = rsts.get(i).getRegionServer();
- for (Region region : hrs.getOnlineRegions(tableName)) {
- return Pair.newPair(region, hrs);
- }
- }
- return null;
- }
-
private WAL getWAL(Region region) {
return ((HRegion)region).getWAL();
}
-
- private int getNumRolledLogFiles(Region region) {
- return ((FSHLog)getWAL(region)).getNumRolledLogFiles();
- }
}
| |