HBASE-17373: Fixing bug in moving segments from compaction pipeline to snapshot

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
eshcar 2017-01-03 15:28:10 +02:00 committed by Michael Stack
parent c3d5f268cf
commit 69ce5967fd
6 changed files with 137 additions and 48 deletions

View File

@ -213,8 +213,10 @@ public class CompactingMemStore extends AbstractMemStore {
} }
} }
// the getSegments() method is used for tests only
@VisibleForTesting
@Override @Override
public List<Segment> getSegments() { protected List<Segment> getSegments() {
List<Segment> pipelineList = pipeline.getSegments(); List<Segment> pipelineList = pipeline.getSegments();
List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2); List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
list.add(this.active); list.add(this.active);
@ -266,6 +268,7 @@ public class CompactingMemStore extends AbstractMemStore {
long order = pipelineList.size(); long order = pipelineList.size();
// The list of elements in pipeline + the active element + the snapshot segment // The list of elements in pipeline + the active element + the snapshot segment
// TODO : This will change when the snapshot is made of more than one element // TODO : This will change when the snapshot is made of more than one element
// The order is the Segment ordinal
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2); List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
list.add(this.active.getScanner(readPt, order + 1)); list.add(this.active.getScanner(readPt, order + 1));
for (Segment item : pipelineList) { for (Segment item : pipelineList) {
@ -374,10 +377,18 @@ public class CompactingMemStore extends AbstractMemStore {
} }
private void pushTailToSnapshot() { private void pushTailToSnapshot() {
ImmutableSegment tail = pipeline.pullTail(); VersionedSegmentsList segments = pipeline.getVersionedTail();
if (!tail.isEmpty()) { pushToSnapshot(segments.getStoreSegments());
this.snapshot = tail; pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
}
private void pushToSnapshot(List<ImmutableSegment> segments) {
if(segments.isEmpty()) return;
if(segments.size() == 1 && !segments.get(0).isEmpty()) {
this.snapshot = segments.get(0);
return;
} }
// TODO else craete composite snapshot
} }
private RegionServicesForStores getRegionServices() { private RegionServicesForStores getRegionServices() {

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -45,18 +46,14 @@ public class CompactionPipeline {
public final static long FIXED_OVERHEAD = ClassSize public final static long FIXED_OVERHEAD = ClassSize
.align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST; public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST;
public final static long ENTRY_OVERHEAD = ClassSize.LINKEDLIST_ENTRY;
private final RegionServicesForStores region; private final RegionServicesForStores region;
private LinkedList<ImmutableSegment> pipeline; private LinkedList<ImmutableSegment> pipeline;
private long version; private long version;
private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance()
.createImmutableSegment((CellComparator) null);
public CompactionPipeline(RegionServicesForStores region) { public CompactionPipeline(RegionServicesForStores region) {
this.region = region; this.region = region;
this.pipeline = new LinkedList<ImmutableSegment>(); this.pipeline = new LinkedList<>();
this.version = 0; this.version = 0;
} }
@ -68,31 +65,33 @@ public class CompactionPipeline {
} }
} }
public ImmutableSegment pullTail() { public VersionedSegmentsList getVersionedList() {
synchronized (pipeline){ synchronized (pipeline){
if(pipeline.isEmpty()) { List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
return EMPTY_MEM_STORE_SEGMENT; return new VersionedSegmentsList(segmentList, version);
}
return removeLast();
} }
} }
public VersionedSegmentsList getVersionedList() { public VersionedSegmentsList getVersionedTail() {
synchronized (pipeline){ synchronized (pipeline){
LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline); List<ImmutableSegment> segmentList = new ArrayList<>();
VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version); if(!pipeline.isEmpty()) {
return res; segmentList.add(0, pipeline.getLast());
}
return new VersionedSegmentsList(segmentList, version);
} }
} }
/** /**
* Swaps the versioned list at the tail of the pipeline with the new compacted segment. * Swaps the versioned list at the tail of the pipeline with a new segment.
* Swapping only if there were no changes to the suffix of the list while it was compacted. * Swapping only if there were no changes to the suffix of the list since the version list was
* @param versionedList tail of the pipeline that was compacted * created.
* @param segment new compacted segment * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline
* @param segment new segment to replace the suffix. Can be null if the suffix just needs to be
* removed.
* @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
* During index merge op this will be false and for compaction it will be true. * During index merge op this will be false and for compaction it will be true.
* @return true iff swapped tail with new compacted segment * @return true iff swapped tail with new segment
*/ */
public boolean swap( public boolean swap(
VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) {
@ -106,26 +105,32 @@ public class CompactionPipeline {
} }
suffix = versionedList.getStoreSegments(); suffix = versionedList.getStoreSegments();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Swapping pipeline suffix with compacted item. " int count = 0;
if(segment != null) {
count = segment.getCellsCount();
}
LOG.debug("Swapping pipeline suffix. "
+ "Just before the swap the number of segments in pipeline is:" + "Just before the swap the number of segments in pipeline is:"
+ versionedList.getStoreSegments().size() + versionedList.getStoreSegments().size()
+ ", and the number of cells in new segment is:" + segment.getCellsCount()); + ", and the number of cells in new segment is:" + count);
} }
swapSuffix(suffix,segment, closeSuffix); swapSuffix(suffix, segment, closeSuffix);
} }
if (region != null) { if (closeSuffix && region != null) {
// update the global memstore size counter // update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix); long suffixDataSize = getSegmentsKeySize(suffix);
long newDataSize = segment.keySize(); long newDataSize = 0;
if(segment != null) newDataSize = segment.keySize();
long dataSizeDelta = suffixDataSize - newDataSize; long dataSizeDelta = suffixDataSize - newDataSize;
long suffixHeapOverhead = getSegmentsHeapOverhead(suffix); long suffixHeapOverhead = getSegmentsHeapOverhead(suffix);
long newHeapOverhead = segment.heapOverhead(); long newHeapOverhead = 0;
if(segment != null) newHeapOverhead = segment.heapOverhead();
long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead; long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead;
region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta)); region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Suffix data size: " + suffixDataSize + " compacted item data size: " LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: "
+ newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead + newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead
+ " compacted item heap overhead: " + newHeapOverhead); + " new segment heap overhead: " + newHeapOverhead);
} }
} }
return true; return true;
@ -193,8 +198,7 @@ public class CompactionPipeline {
public List<Segment> getSegments() { public List<Segment> getSegments() {
synchronized (pipeline){ synchronized (pipeline){
List<Segment> res = new LinkedList<Segment>(pipeline); return new LinkedList<>(pipeline);
return res;
} }
} }
@ -230,12 +234,7 @@ public class CompactionPipeline {
} }
} }
pipeline.removeAll(suffix); pipeline.removeAll(suffix);
pipeline.addLast(segment); if(segment != null) pipeline.addLast(segment);
}
private ImmutableSegment removeLast() {
version++;
return pipeline.removeLast();
} }
private boolean addFirst(ImmutableSegment segment) { private boolean addFirst(ImmutableSegment segment) {

View File

@ -18,7 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -36,10 +35,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public class VersionedSegmentsList { public class VersionedSegmentsList {
private final LinkedList<ImmutableSegment> storeSegments; private final List<ImmutableSegment> storeSegments;
private final long version; private final long version;
public VersionedSegmentsList(LinkedList<ImmutableSegment> storeSegments, long version) { public VersionedSegmentsList(List<ImmutableSegment> storeSegments, long version) {
this.storeSegments = storeSegments; this.storeSegments = storeSegments;
this.version = version; this.version = version;
} }

View File

@ -33,17 +33,18 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -75,11 +76,18 @@ public class TestAsyncTableGetMultiThreaded {
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
setUp(HColumnDescriptor.MemoryCompaction.NONE);
}
protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception {
TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(memoryCompaction));
TEST_UTIL.startMiniCluster(5); TEST_UTIL.startMiniCluster(5);
SPLIT_KEYS = new byte[8][]; SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) { for (int i = 111; i < 999; i += 111) {
@ -103,11 +111,13 @@ public class TestAsyncTableGetMultiThreaded {
private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
while (!stop.get()) { while (!stop.get()) {
int i = ThreadLocalRandom.current().nextInt(COUNT); for (int i = 0; i < COUNT; i++) {
assertEquals(i, assertEquals(i,
Bytes.toInt( Bytes.toInt(
CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
.getValue(FAMILY, QUALIFIER))); .get()
.getValue(FAMILY, QUALIFIER)));
}
} }
} }

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableGetMultiThreadedWithBasicCompaction extends
TestAsyncTableGetMultiThreaded {
@BeforeClass
public static void setUp() throws Exception {
setUp(HColumnDescriptor.MemoryCompaction.BASIC);
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncTableGetMultiThreadedWithEagerCompaction extends
TestAsyncTableGetMultiThreaded {
@BeforeClass
public static void setUp() throws Exception {
setUp(HColumnDescriptor.MemoryCompaction.EAGER);
}
}