HBASE-27543 May be miss data when use mob
This commit is contained in:
parent
382681e2d6
commit
50389904c9
|
@ -128,13 +128,13 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */
|
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */
|
||||||
protected static class FileDetails {
|
public static class FileDetails {
|
||||||
/** Maximum key count after compaction (for blooms) */
|
/** Maximum key count after compaction (for blooms) */
|
||||||
public long maxKeyCount = 0;
|
public long maxKeyCount = 0;
|
||||||
/** Earliest put timestamp if major compaction */
|
/** Earliest put timestamp if major compaction */
|
||||||
public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
|
public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
|
||||||
/** Latest put timestamp */
|
/** Latest put timestamp */
|
||||||
public long latestPutTs = HConstants.LATEST_TIMESTAMP;
|
public long latestPutTs = 0;
|
||||||
/** The last key in the files we're compacting. */
|
/** The last key in the files we're compacting. */
|
||||||
public long maxSeqId = 0;
|
public long maxSeqId = 0;
|
||||||
/** Latest memstore read point found in any of the involved files */
|
/** Latest memstore read point found in any of the involved files */
|
||||||
|
@ -154,11 +154,12 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
* @parma major If major compaction
|
* @parma major If major compaction
|
||||||
* @return The result.
|
* @return The result.
|
||||||
*/
|
*/
|
||||||
private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles,
|
static FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, long keepSeqIdPeriod,
|
||||||
boolean major) throws IOException {
|
boolean allFiles, boolean major, Compression.Algorithm majorCompactionCompression,
|
||||||
|
Compression.Algorithm minorCompactionCompression) throws IOException {
|
||||||
FileDetails fd = new FileDetails();
|
FileDetails fd = new FileDetails();
|
||||||
long oldestHFileTimestampToKeepMVCC =
|
long oldestHFileTimestampToKeepMVCC =
|
||||||
EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
|
EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * keepSeqIdPeriod);
|
||||||
|
|
||||||
for (HStoreFile file : filesToCompact) {
|
for (HStoreFile file : filesToCompact) {
|
||||||
if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) {
|
if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) {
|
||||||
|
@ -216,8 +217,9 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tmp = fileInfo.get(TIMERANGE_KEY);
|
tmp = fileInfo.get(TIMERANGE_KEY);
|
||||||
fd.latestPutTs =
|
long latestPutTs =
|
||||||
tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax();
|
tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax();
|
||||||
|
fd.latestPutTs = Math.max(fd.latestPutTs, latestPutTs);
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Compacting {}, keycount={}, bloomtype={}, size={}, "
|
"Compacting {}, keycount={}, bloomtype={}, size={}, "
|
||||||
+ "encoding={}, compression={}, seqNum={}{}",
|
+ "encoding={}, compression={}, seqNum={}{}",
|
||||||
|
@ -328,7 +330,9 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
protected final List<Path> compact(final CompactionRequestImpl request,
|
protected final List<Path> compact(final CompactionRequestImpl request,
|
||||||
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
|
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
|
||||||
ThroughputController throughputController, User user) throws IOException {
|
ThroughputController throughputController, User user) throws IOException {
|
||||||
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
|
FileDetails fd =
|
||||||
|
getFileDetails(request.getFiles(), keepSeqIdPeriod, request.isAllFiles(), request.isMajor(),
|
||||||
|
majorCompactionCompression, minorCompactionCompression);
|
||||||
|
|
||||||
// Find the smallest read point across all the Scanners.
|
// Find the smallest read point across all the Scanners.
|
||||||
long smallestReadPoint = getSmallestReadPoint();
|
long smallestReadPoint = getSmallestReadPoint();
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
public class TestFileDetails {
|
||||||
|
|
||||||
|
@Test public void testLatestPutTs() throws IOException {
|
||||||
|
List<HStoreFile> sfs = new ArrayList<>(3);
|
||||||
|
Map<byte[], byte[]> fileInfo = new HashMap<>();
|
||||||
|
TimeRangeTracker tracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, 0, 3000);
|
||||||
|
fileInfo.put(HStoreFile.TIMERANGE_KEY, TimeRangeTracker.toByteArray(tracker));
|
||||||
|
sfs.add(createStoreFile(fileInfo));
|
||||||
|
tracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, 0, 2000);
|
||||||
|
fileInfo.put(HStoreFile.TIMERANGE_KEY, TimeRangeTracker.toByteArray(tracker));
|
||||||
|
sfs.add(createStoreFile(fileInfo));
|
||||||
|
tracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, 0, 1000);
|
||||||
|
fileInfo.put(HStoreFile.TIMERANGE_KEY, TimeRangeTracker.toByteArray(tracker));
|
||||||
|
sfs.add(createStoreFile(fileInfo));
|
||||||
|
|
||||||
|
FileDetails fd = Compactor.getFileDetails(sfs, HConstants.MIN_KEEP_SEQID_PERIOD, false, false,
|
||||||
|
Compression.Algorithm.NONE, Compression.Algorithm.NONE);
|
||||||
|
assertEquals(3000, fd.latestPutTs);
|
||||||
|
|
||||||
|
// when TIMERANGE_KEY is null
|
||||||
|
fileInfo.clear();
|
||||||
|
sfs.add(createStoreFile(fileInfo));
|
||||||
|
fd = Compactor.getFileDetails(sfs, HConstants.MIN_KEEP_SEQID_PERIOD, false, false,
|
||||||
|
Compression.Algorithm.NONE, Compression.Algorithm.NONE);
|
||||||
|
assertEquals(HConstants.LATEST_TIMESTAMP, fd.latestPutTs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HStoreFile createStoreFile(Map<byte[], byte[]> fileInfo)
|
||||||
|
throws IOException {
|
||||||
|
HStoreFile sf = Mockito.mock(HStoreFile.class);
|
||||||
|
Mockito.doReturn(System.currentTimeMillis()).when(sf).getModificationTimestamp();
|
||||||
|
Mockito.doReturn(0L).when(sf).getMaxSequenceId();
|
||||||
|
StoreFileReader reader = Mockito.mock(StoreFileReader.class);
|
||||||
|
Mockito.doReturn(0L).when(reader).getEntries();
|
||||||
|
Mockito.doReturn(new HashMap<>(fileInfo)).when(reader).loadFileInfo();
|
||||||
|
Mockito.doReturn(0L).when(reader).length();
|
||||||
|
Mockito.doReturn(false).when(reader).isBulkLoaded();
|
||||||
|
Mockito.doReturn(BloomType.NONE).when(reader).getBloomFilterType();
|
||||||
|
HFile.Reader hfr = Mockito.mock(HFile.Reader.class);
|
||||||
|
Mockito.doReturn(DataBlockEncoding.NONE).when(hfr).getDataBlockEncoding();
|
||||||
|
Mockito.doReturn(hfr).when(reader).getHFileReader();
|
||||||
|
Mockito.doReturn(reader).when(sf).getReader();
|
||||||
|
return sf;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue