HBASE-24382 Flush partial stores of region filtered by seqId when archive wal due to too many wals (#2049)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
86f00e4749
commit
bf368a01bc
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.region;
|
||||||
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
|
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -39,8 +40,8 @@ import org.slf4j.LoggerFactory;
|
||||||
* roller logic by our own.
|
* roller logic by our own.
|
||||||
* <p/>
|
* <p/>
|
||||||
* We can reuse most of the code for normal wal roller, the only difference is that there is only
|
* We can reuse most of the code for normal wal roller, the only difference is that there is only
|
||||||
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the master
|
* one region, so in {@link #scheduleFlush(String, List)} method we can just schedule flush
|
||||||
* local region.
|
* for the master local region.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
|
public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
|
||||||
|
@ -79,7 +80,7 @@ public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void scheduleFlush(String encodedRegionName) {
|
protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
|
||||||
MasterRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
|
MasterRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
|
||||||
if (flusher != null) {
|
if (flusher != null) {
|
||||||
flusher.requestFlush();
|
flusher.requestFlush();
|
||||||
|
|
|
@ -45,5 +45,4 @@ public abstract class FlushPolicy extends Configured {
|
||||||
* @return the stores need to be flushed.
|
* @return the stores need to be flushed.
|
||||||
*/
|
*/
|
||||||
public abstract Collection<HStore> selectStoresToFlush();
|
public abstract Collection<HStore> selectStoresToFlush();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -30,22 +32,28 @@ public interface FlushRequester {
|
||||||
* Tell the listener the cache needs to be flushed.
|
* Tell the listener the cache needs to be flushed.
|
||||||
*
|
*
|
||||||
* @param region the Region requesting the cache flush
|
* @param region the Region requesting the cache flush
|
||||||
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
|
|
||||||
* rolling.
|
|
||||||
* @return true if our region is added into the queue, false otherwise
|
* @return true if our region is added into the queue, false otherwise
|
||||||
*/
|
*/
|
||||||
boolean requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker);
|
boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tell the listener the cache needs to be flushed.
|
||||||
|
*
|
||||||
|
* @param region the Region requesting the cache flush
|
||||||
|
* @param families stores of region to flush, if null then use flush policy
|
||||||
|
* @return true if our region is added into the queue, false otherwise
|
||||||
|
*/
|
||||||
|
boolean requestFlush(HRegion region, List<byte[]> families,
|
||||||
|
FlushLifeCycleTracker tracker);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tell the listener the cache needs to be flushed after a delay
|
* Tell the listener the cache needs to be flushed after a delay
|
||||||
*
|
*
|
||||||
* @param region the Region requesting the cache flush
|
* @param region the Region requesting the cache flush
|
||||||
* @param delay after how much time should the flush happen
|
* @param delay after how much time should the flush happen
|
||||||
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
|
|
||||||
* rolling.
|
|
||||||
* @return true if our region is added into the queue, false otherwise
|
* @return true if our region is added into the queue, false otherwise
|
||||||
*/
|
*/
|
||||||
boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores);
|
boolean requestDelayedFlush(HRegion region, long delay);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a FlushRequestListener
|
* Register a FlushRequestListener
|
||||||
|
|
|
@ -2322,7 +2322,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
*
|
*
|
||||||
* <p>This method may block for some time, so it should not be called from a
|
* <p>This method may block for some time, so it should not be called from a
|
||||||
* time-sensitive thread.
|
* time-sensitive thread.
|
||||||
* @param force whether we want to force a flush of all stores
|
* @param flushAllStores whether we want to force a flush of all stores
|
||||||
* @return FlushResult indicating whether the flush was successful or not and if
|
* @return FlushResult indicating whether the flush was successful or not and if
|
||||||
* the region needs compacting
|
* the region needs compacting
|
||||||
*
|
*
|
||||||
|
@ -2330,8 +2330,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
* because a snapshot was not properly persisted.
|
* because a snapshot was not properly persisted.
|
||||||
*/
|
*/
|
||||||
// TODO HBASE-18905. We might have to expose a requestFlush API for CPs
|
// TODO HBASE-18905. We might have to expose a requestFlush API for CPs
|
||||||
public FlushResult flush(boolean force) throws IOException {
|
public FlushResult flush(boolean flushAllStores) throws IOException {
|
||||||
return flushcache(force, false, FlushLifeCycleTracker.DUMMY);
|
return flushcache(flushAllStores, false, FlushLifeCycleTracker.DUMMY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface FlushResult {
|
public interface FlushResult {
|
||||||
|
@ -2354,6 +2354,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
boolean isCompactionNeeded();
|
boolean isCompactionNeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
|
||||||
|
FlushLifeCycleTracker tracker) throws IOException {
|
||||||
|
List families = null;
|
||||||
|
if (flushAllStores) {
|
||||||
|
families = new ArrayList();
|
||||||
|
families.addAll(this.getTableDescriptor().getColumnFamilyNames());
|
||||||
|
}
|
||||||
|
return this.flushcache(families, writeFlushRequestWalMarker, tracker);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush the cache.
|
* Flush the cache.
|
||||||
*
|
*
|
||||||
|
@ -2367,7 +2377,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
*
|
*
|
||||||
* <p>This method may block for some time, so it should not be called from a
|
* <p>This method may block for some time, so it should not be called from a
|
||||||
* time-sensitive thread.
|
* time-sensitive thread.
|
||||||
* @param forceFlushAllStores whether we want to flush all stores
|
* @param families stores of region to flush.
|
||||||
* @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
|
* @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
|
||||||
* @param tracker used to track the life cycle of this flush
|
* @param tracker used to track the life cycle of this flush
|
||||||
* @return whether the flush is success and whether the region needs compacting
|
* @return whether the flush is success and whether the region needs compacting
|
||||||
|
@ -2377,8 +2387,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
|
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
|
||||||
* caller MUST abort after this.
|
* caller MUST abort after this.
|
||||||
*/
|
*/
|
||||||
public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker,
|
public FlushResultImpl flushcache(List<byte[]> families,
|
||||||
FlushLifeCycleTracker tracker) throws IOException {
|
boolean writeFlushRequestWalMarker, FlushLifeCycleTracker tracker) throws IOException {
|
||||||
// fail-fast instead of waiting on the lock
|
// fail-fast instead of waiting on the lock
|
||||||
if (this.closing.get()) {
|
if (this.closing.get()) {
|
||||||
String msg = "Skipping flush on " + this + " because closing";
|
String msg = "Skipping flush on " + this + " because closing";
|
||||||
|
@ -2425,8 +2435,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Collection<HStore> specificStoresToFlush =
|
// The reason that we do not always use flushPolicy is, when the flush is
|
||||||
forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
|
// caused by logRoller, we should select stores which must be flushed
|
||||||
|
// rather than could be flushed.
|
||||||
|
Collection<HStore> specificStoresToFlush = null;
|
||||||
|
if (families != null) {
|
||||||
|
specificStoresToFlush = getSpecificStores(families);
|
||||||
|
} else {
|
||||||
|
specificStoresToFlush = flushPolicy.selectStoresToFlush();
|
||||||
|
}
|
||||||
FlushResultImpl fs =
|
FlushResultImpl fs =
|
||||||
internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker);
|
internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker);
|
||||||
|
|
||||||
|
@ -2456,6 +2473,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get stores which matches the specified families
|
||||||
|
*
|
||||||
|
* @return the stores need to be flushed.
|
||||||
|
*/
|
||||||
|
private Collection<HStore> getSpecificStores(List<byte[]> families) {
|
||||||
|
Collection<HStore> specificStoresToFlush = new ArrayList<>();
|
||||||
|
for (byte[] family : families) {
|
||||||
|
specificStoresToFlush.add(stores.get(family));
|
||||||
|
}
|
||||||
|
return specificStoresToFlush;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should the store be flushed because it is old enough.
|
* Should the store be flushed because it is old enough.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -8916,7 +8946,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
if (shouldFlush) {
|
if (shouldFlush) {
|
||||||
// Make request outside of synchronize block; HBASE-818.
|
// Make request outside of synchronize block; HBASE-818.
|
||||||
this.rsServices.getFlushRequester().requestFlush(this, false, tracker);
|
this.rsServices.getFlushRequester().requestFlush(this, tracker);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
|
LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
|
||||||
}
|
}
|
||||||
|
|
|
@ -1857,7 +1857,7 @@ public class HRegionServer extends Thread implements
|
||||||
//Throttle the flushes by putting a delay. If we don't throttle, and there
|
//Throttle the flushes by putting a delay. If we don't throttle, and there
|
||||||
//is a balanced write-load on the regions in a table, we might end up
|
//is a balanced write-load on the regions in a table, we might end up
|
||||||
//overwhelming the filesystem with too many flushes at once.
|
//overwhelming the filesystem with too many flushes at once.
|
||||||
if (requester.requestDelayedFlush(r, randomDelay, false)) {
|
if (requester.requestDelayedFlush(r, randomDelay)) {
|
||||||
LOG.info("{} requesting flush of {} because {} after random delay {} ms",
|
LOG.info("{} requesting flush of {} because {} after random delay {} ms",
|
||||||
getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(),
|
getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(),
|
||||||
randomDelay);
|
randomDelay);
|
||||||
|
|
|
@ -18,7 +18,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
|
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -45,7 +47,7 @@ public class LogRoller extends AbstractWALRoller<RegionServerServices> {
|
||||||
super("LogRoller", services.getConfiguration(), services);
|
super("LogRoller", services.getConfiguration(), services);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void scheduleFlush(String encodedRegionName) {
|
protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
|
||||||
RegionServerServices services = this.abortable;
|
RegionServerServices services = this.abortable;
|
||||||
HRegion r = (HRegion) services.getRegion(encodedRegionName);
|
HRegion r = (HRegion) services.getRegion(encodedRegionName);
|
||||||
if (r == null) {
|
if (r == null) {
|
||||||
|
@ -58,8 +60,8 @@ public class LogRoller extends AbstractWALRoller<RegionServerServices> {
|
||||||
encodedRegionName, r);
|
encodedRegionName, r);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// force flushing all stores to clean old logs
|
// flush specified stores to clean old logs
|
||||||
requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
|
requester.requestFlush(r, families, FlushLifeCycleTracker.DUMMY);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -285,7 +285,7 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
|
server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
|
||||||
", Region memstore size=" +
|
", Region memstore size=" +
|
||||||
TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
|
TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
|
||||||
flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
|
flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);
|
||||||
|
|
||||||
if (!flushedOne) {
|
if (!flushedOne) {
|
||||||
LOG.info("Excluding unflushable region " + regionToFlush +
|
LOG.info("Excluding unflushable region " + regionToFlush +
|
||||||
|
@ -453,13 +453,18 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean requestFlush(HRegion r, boolean forceFlushAllStores,
|
public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
|
||||||
|
return this.requestFlush(r, null, tracker);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requestFlush(HRegion r, List<byte[]> families,
|
||||||
FlushLifeCycleTracker tracker) {
|
FlushLifeCycleTracker tracker) {
|
||||||
synchronized (regionsInQueue) {
|
synchronized (regionsInQueue) {
|
||||||
if (!regionsInQueue.containsKey(r)) {
|
if (!regionsInQueue.containsKey(r)) {
|
||||||
// This entry has no delay so it will be added at the top of the flush
|
// This entry has no delay so it will be added at the top of the flush
|
||||||
// queue. It'll come out near immediately.
|
// queue. It'll come out near immediately.
|
||||||
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
|
FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
|
||||||
this.regionsInQueue.put(r, fqe);
|
this.regionsInQueue.put(r, fqe);
|
||||||
this.flushQueue.add(fqe);
|
this.flushQueue.add(fqe);
|
||||||
r.incrementFlushesQueuedCount();
|
r.incrementFlushesQueuedCount();
|
||||||
|
@ -472,12 +477,12 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
|
public boolean requestDelayedFlush(HRegion r, long delay) {
|
||||||
synchronized (regionsInQueue) {
|
synchronized (regionsInQueue) {
|
||||||
if (!regionsInQueue.containsKey(r)) {
|
if (!regionsInQueue.containsKey(r)) {
|
||||||
// This entry has some delay
|
// This entry has some delay
|
||||||
FlushRegionEntry fqe =
|
FlushRegionEntry fqe =
|
||||||
new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
|
new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
|
||||||
fqe.requeue(delay);
|
fqe.requeue(delay);
|
||||||
this.regionsInQueue.put(r, fqe);
|
this.regionsInQueue.put(r, fqe);
|
||||||
this.flushQueue.add(fqe);
|
this.flushQueue.add(fqe);
|
||||||
|
@ -576,7 +581,7 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
|
return flushRegion(region, false, fqe.families, fqe.getTracker());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -586,13 +591,13 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
* needs to be removed from the flush queue. If false, when we were called
|
* needs to be removed from the flush queue. If false, when we were called
|
||||||
* from the main flusher run loop and we got the entry to flush by calling
|
* from the main flusher run loop and we got the entry to flush by calling
|
||||||
* poll on the flush queue (which removed it).
|
* poll on the flush queue (which removed it).
|
||||||
* @param forceFlushAllStores whether we want to flush all store.
|
* @param families stores of region to flush.
|
||||||
* @return true if the region was successfully flushed, false otherwise. If
|
* @return true if the region was successfully flushed, false otherwise. If
|
||||||
* false, there will be accompanying log messages explaining why the region was
|
* false, there will be accompanying log messages explaining why the region was
|
||||||
* not flushed.
|
* not flushed.
|
||||||
*/
|
*/
|
||||||
private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
|
private boolean flushRegion(HRegion region, boolean emergencyFlush,
|
||||||
FlushLifeCycleTracker tracker) {
|
List<byte[]> families, FlushLifeCycleTracker tracker) {
|
||||||
synchronized (this.regionsInQueue) {
|
synchronized (this.regionsInQueue) {
|
||||||
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
|
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
|
||||||
// Use the start time of the FlushRegionEntry if available
|
// Use the start time of the FlushRegionEntry if available
|
||||||
|
@ -607,7 +612,7 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
notifyFlushRequest(region, emergencyFlush);
|
notifyFlushRequest(region, emergencyFlush);
|
||||||
FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
|
FlushResult flushResult = region.flushcache(families, false, tracker);
|
||||||
boolean shouldCompact = flushResult.isCompactionNeeded();
|
boolean shouldCompact = flushResult.isCompactionNeeded();
|
||||||
// We just want to check the size
|
// We just want to check the size
|
||||||
boolean shouldSplit = region.checkSplit().isPresent();
|
boolean shouldSplit = region.checkSplit().isPresent();
|
||||||
|
@ -840,15 +845,16 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
private long whenToExpire;
|
private long whenToExpire;
|
||||||
private int requeueCount = 0;
|
private int requeueCount = 0;
|
||||||
|
|
||||||
private final boolean forceFlushAllStores;
|
private final List<byte[]> families;
|
||||||
|
|
||||||
private final FlushLifeCycleTracker tracker;
|
private final FlushLifeCycleTracker tracker;
|
||||||
|
|
||||||
FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
|
FlushRegionEntry(final HRegion r, List<byte[]> families,
|
||||||
|
FlushLifeCycleTracker tracker) {
|
||||||
this.region = r;
|
this.region = r;
|
||||||
this.createTime = EnvironmentEdgeManager.currentTime();
|
this.createTime = EnvironmentEdgeManager.currentTime();
|
||||||
this.whenToExpire = this.createTime;
|
this.whenToExpire = this.createTime;
|
||||||
this.forceFlushAllStores = forceFlushAllStores;
|
this.families = families;
|
||||||
this.tracker = tracker;
|
this.tracker = tracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -868,13 +874,6 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
return this.requeueCount;
|
return this.requeueCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return whether we need to flush all stores.
|
|
||||||
*/
|
|
||||||
public boolean isForceFlushAllStores() {
|
|
||||||
return forceFlushAllStores;
|
|
||||||
}
|
|
||||||
|
|
||||||
public FlushLifeCycleTracker getTracker() {
|
public FlushLifeCycleTracker getTracker() {
|
||||||
return tracker;
|
return tracker;
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.commons.lang3.mutable.MutableLong;
|
import org.apache.commons.lang3.mutable.MutableLong;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -527,7 +528,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[][] rollWriter() throws FailedLogCloseException, IOException {
|
public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
|
||||||
return rollWriter(false);
|
return rollWriter(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -622,10 +623,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
|
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
|
||||||
* check the first (oldest) WAL, and return those regions which should be flushed so that
|
* check the first (oldest) WAL, and return those regions which should be flushed so that
|
||||||
* it can be let-go/'archived'.
|
* it can be let-go/'archived'.
|
||||||
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
|
* @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file.
|
||||||
*/
|
*/
|
||||||
byte[][] findRegionsToForceFlush() throws IOException {
|
Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
|
||||||
byte[][] regions = null;
|
Map<byte[], List<byte[]>> regions = null;
|
||||||
int logCount = getNumRolledLogFiles();
|
int logCount = getNumRolledLogFiles();
|
||||||
if (logCount > this.maxLogs && logCount > 0) {
|
if (logCount > this.maxLogs && logCount > 0) {
|
||||||
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
|
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
|
||||||
|
@ -633,15 +634,20 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
|
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
|
||||||
}
|
}
|
||||||
if (regions != null) {
|
if (regions != null) {
|
||||||
StringBuilder sb = new StringBuilder();
|
List<String> listForPrint = new ArrayList();
|
||||||
for (int i = 0; i < regions.length; i++) {
|
for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
|
||||||
|
StringBuilder families = new StringBuilder();
|
||||||
|
for (int i = 0; i < r.getValue().size(); i++) {
|
||||||
if (i > 0) {
|
if (i > 0) {
|
||||||
sb.append(", ");
|
families.append(",");
|
||||||
}
|
}
|
||||||
sb.append(Bytes.toStringBinary(regions[i]));
|
families.append(Bytes.toString(r.getValue().get(i)));
|
||||||
|
}
|
||||||
|
listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
|
||||||
}
|
}
|
||||||
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
|
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
|
||||||
"; forcing flush of " + regions.length + " regions(s): " + sb.toString());
|
"; forcing (partial) flush of " + regions.size() + " region(s): " +
|
||||||
|
StringUtils.join(",", listForPrint));
|
||||||
}
|
}
|
||||||
return regions;
|
return regions;
|
||||||
}
|
}
|
||||||
|
@ -778,14 +784,14 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
|
public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
|
||||||
rollWriterLock.lock();
|
rollWriterLock.lock();
|
||||||
try {
|
try {
|
||||||
// Return if nothing to flush.
|
// Return if nothing to flush.
|
||||||
if (!force && this.writer != null && this.numEntries.get() <= 0) {
|
if (!force && this.writer != null && this.numEntries.get() <= 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
byte[][] regionsToFlush = null;
|
Map<byte[], List<byte[]>> regionsToFlush = null;
|
||||||
if (this.closed) {
|
if (this.closed) {
|
||||||
LOG.debug("WAL closed. Skipping rolling of writer");
|
LOG.debug("WAL closed. Skipping rolling of writer");
|
||||||
return regionsToFlush;
|
return regionsToFlush;
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -431,10 +432,10 @@ class SequenceIdAccounting {
|
||||||
* {@link #lowestUnflushedSequenceIds} has a sequence id less than that passed in
|
* {@link #lowestUnflushedSequenceIds} has a sequence id less than that passed in
|
||||||
* <code>sequenceids</code> then return it.
|
* <code>sequenceids</code> then return it.
|
||||||
* @param sequenceids Sequenceids keyed by encoded region name.
|
* @param sequenceids Sequenceids keyed by encoded region name.
|
||||||
* @return regions found in this instance with sequence ids less than those passed in.
|
* @return stores of regions found in this instance with sequence ids less than those passed in.
|
||||||
*/
|
*/
|
||||||
byte[][] findLower(Map<byte[], Long> sequenceids) {
|
Map<byte[], List<byte[]>> findLower(Map<byte[], Long> sequenceids) {
|
||||||
List<byte[]> toFlush = null;
|
Map<byte[], List<byte[]>> toFlush = null;
|
||||||
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
|
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
|
||||||
synchronized (tieLock) {
|
synchronized (tieLock) {
|
||||||
for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
|
for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
|
||||||
|
@ -442,16 +443,17 @@ class SequenceIdAccounting {
|
||||||
if (m == null) {
|
if (m == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// The lowest sequence id outstanding for this region.
|
for (Map.Entry<ImmutableByteArray, Long> me : m.entrySet()) {
|
||||||
long lowest = getLowestSequenceId(m);
|
if (me.getValue() <= e.getValue()) {
|
||||||
if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) {
|
|
||||||
if (toFlush == null) {
|
if (toFlush == null) {
|
||||||
toFlush = new ArrayList<>();
|
toFlush = new TreeMap(Bytes.BYTES_COMPARATOR);
|
||||||
}
|
}
|
||||||
toFlush.add(e.getKey());
|
toFlush.computeIfAbsent(e.getKey(), k -> new ArrayList<>())
|
||||||
|
.add(Bytes.toBytes(me.getKey().toString()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return toFlush == null ? null : toFlush.toArray(new byte[0][]);
|
}
|
||||||
|
return toFlush;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.net.ConnectException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
@ -44,8 +45,8 @@ import org.slf4j.LoggerFactory;
|
||||||
* NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
|
* NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
|
||||||
* there is something to do, rather than the Chore sleep time which is invariant.
|
* there is something to do, rather than the Chore sleep time which is invariant.
|
||||||
* <p/>
|
* <p/>
|
||||||
* The {@link #scheduleFlush(String)} is abstract here, as sometimes we hold a region without a
|
* The {@link #scheduleFlush(String, List)} is abstract here,
|
||||||
* region server but we still want to roll its WAL.
|
* as sometimes we hold a region without a region server but we still want to roll its WAL.
|
||||||
* <p/>
|
* <p/>
|
||||||
* TODO: change to a pool of threads
|
* TODO: change to a pool of threads
|
||||||
*/
|
*/
|
||||||
|
@ -180,11 +181,12 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
||||||
// reset the flag in front to avoid missing roll request before we return from rollWriter.
|
// reset the flag in front to avoid missing roll request before we return from rollWriter.
|
||||||
walNeedsRoll.put(wal, Boolean.FALSE);
|
walNeedsRoll.put(wal, Boolean.FALSE);
|
||||||
// Force the roll if the logroll.period is elapsed or if a roll was requested.
|
// Force the roll if the logroll.period is elapsed or if a roll was requested.
|
||||||
// The returned value is an array of actual region names.
|
// The returned value is an collection of actual region and family names.
|
||||||
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
|
Map<byte[], List<byte[]>> regionsToFlush = wal.rollWriter(periodic ||
|
||||||
|
entry.getValue().booleanValue());
|
||||||
if (regionsToFlush != null) {
|
if (regionsToFlush != null) {
|
||||||
for (byte[] r : regionsToFlush) {
|
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
|
||||||
scheduleFlush(Bytes.toString(r));
|
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
afterRoll(wal);
|
afterRoll(wal);
|
||||||
|
@ -211,8 +213,9 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param encodedRegionName Encoded name of region to flush.
|
* @param encodedRegionName Encoded name of region to flush.
|
||||||
|
* @param families stores of region to flush.
|
||||||
*/
|
*/
|
||||||
protected abstract void scheduleFlush(String encodedRegionName);
|
protected abstract void scheduleFlush(String encodedRegionName, List<byte[]> families);
|
||||||
|
|
||||||
private boolean isWaiting() {
|
private boolean isWaiting() {
|
||||||
Thread.State state = getState();
|
Thread.State state = getState();
|
||||||
|
|
|
@ -115,7 +115,7 @@ class DisabledWALProvider implements WALProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[][] rollWriter() {
|
public Map<byte[], List<byte[]>> rollWriter() {
|
||||||
if (!listeners.isEmpty()) {
|
if (!listeners.isEmpty()) {
|
||||||
for (WALActionsListener listener : listeners) {
|
for (WALActionsListener listener : listeners) {
|
||||||
listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
|
listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
|
||||||
|
@ -139,7 +139,7 @@ class DisabledWALProvider implements WALProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[][] rollWriter(boolean force) {
|
public Map<byte[], List<byte[]>> rollWriter(boolean force) {
|
||||||
return rollWriter();
|
return rollWriter();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -61,11 +62,11 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
||||||
* The implementation is synchronized in order to make sure there's one rollWriter
|
* The implementation is synchronized in order to make sure there's one rollWriter
|
||||||
* running at any given time.
|
* running at any given time.
|
||||||
*
|
*
|
||||||
* @return If lots of logs, flush the returned regions so next time through we
|
* @return If lots of logs, flush the stores of returned regions so next time through we
|
||||||
* can clean logs. Returns null if nothing to flush. Names are actual
|
* can clean logs. Returns null if nothing to flush. Names are actual
|
||||||
* region names as returned by {@link RegionInfo#getEncodedName()}
|
* region names as returned by {@link RegionInfo#getEncodedName()}
|
||||||
*/
|
*/
|
||||||
byte[][] rollWriter() throws FailedLogCloseException, IOException;
|
Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Roll the log writer. That is, start writing log messages to a new file.
|
* Roll the log writer. That is, start writing log messages to a new file.
|
||||||
|
@ -77,11 +78,11 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
||||||
* @param force
|
* @param force
|
||||||
* If true, force creation of a new writer even if no entries have
|
* If true, force creation of a new writer even if no entries have
|
||||||
* been written to the current writer
|
* been written to the current writer
|
||||||
* @return If lots of logs, flush the returned regions so next time through we
|
* @return If lots of logs, flush the stores of returned regions so next time through we
|
||||||
* can clean logs. Returns null if nothing to flush. Names are actual
|
* can clean logs. Returns null if nothing to flush. Names are actual
|
||||||
* region names as returned by {@link RegionInfo#getEncodedName()}
|
* region names as returned by {@link RegionInfo#getEncodedName()}
|
||||||
*/
|
*/
|
||||||
byte[][] rollWriter(boolean force) throws IOException;
|
Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop accepting new writes. If we have unsynced writes still in buffer, sync them.
|
* Stop accepting new writes. If we have unsynced writes still in buffer, sync them.
|
||||||
|
|
|
@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -121,8 +123,9 @@ public class TestFailedAppendAndSync {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
|
public Map<byte[], List<byte[]>> rollWriter(boolean force)
|
||||||
byte [][] regions = super.rollWriter(force);
|
throws FailedLogCloseException, IOException {
|
||||||
|
Map<byte[], List<byte[]>> regions = super.rollWriter(force);
|
||||||
rolls.getAndIncrement();
|
rolls.getAndIncrement();
|
||||||
return regions;
|
return regions;
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,8 +66,8 @@ public class TestFlushRegionEntry {
|
||||||
HRegion r = mock(HRegion.class);
|
HRegion r = mock(HRegion.class);
|
||||||
doReturn(hri).when(r).getRegionInfo();
|
doReturn(hri).when(r).getRegionInfo();
|
||||||
|
|
||||||
FlushRegionEntry entry = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
|
FlushRegionEntry entry = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
|
||||||
FlushRegionEntry other = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
|
FlushRegionEntry other = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
|
||||||
|
|
||||||
assertEquals(entry.hashCode(), other.hashCode());
|
assertEquals(entry.hashCode(), other.hashCode());
|
||||||
assertEquals(entry, other);
|
assertEquals(entry, other);
|
||||||
|
|
|
@ -24,6 +24,8 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hbase.ChoreService;
|
import org.apache.hadoop.hbase.ChoreService;
|
||||||
|
@ -139,11 +141,11 @@ public class TestHeapMemoryManager {
|
||||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||||
heapMemoryManager.start(choreService);
|
heapMemoryManager.start(choreService);
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
Thread.sleep(1500);
|
Thread.sleep(1500);
|
||||||
// No changes should be made by tuner as we already have lot of empty space
|
// No changes should be made by tuner as we already have lot of empty space
|
||||||
|
@ -182,10 +184,10 @@ public class TestHeapMemoryManager {
|
||||||
// do some offheap flushes also. So there should be decrease in memstore but
|
// do some offheap flushes also. So there should be decrease in memstore but
|
||||||
// not as that when we don't have offheap flushes
|
// not as that when we don't have offheap flushes
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||||
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||||
|
@ -230,10 +232,10 @@ public class TestHeapMemoryManager {
|
||||||
// do some offheap flushes also. So there should be decrease in memstore but
|
// do some offheap flushes also. So there should be decrease in memstore but
|
||||||
// not as that when we don't have offheap flushes
|
// not as that when we don't have offheap flushes
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||||
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||||
|
@ -246,10 +248,10 @@ public class TestHeapMemoryManager {
|
||||||
// flushes are due to onheap overhead. This should once again call for increase in
|
// flushes are due to onheap overhead. This should once again call for increase in
|
||||||
// memstore size but that increase should be to the safe size
|
// memstore size but that increase should be to the safe size
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||||
assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||||
|
@ -312,10 +314,10 @@ public class TestHeapMemoryManager {
|
||||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||||
heapMemoryManager.start(choreService);
|
heapMemoryManager.start(choreService);
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||||
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
|
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
|
||||||
|
@ -326,8 +328,8 @@ public class TestHeapMemoryManager {
|
||||||
oldBlockCacheSize = blockCache.maxSize;
|
oldBlockCacheSize = blockCache.maxSize;
|
||||||
// Do some more flushes before the next run of HeapMemoryTuner
|
// Do some more flushes before the next run of HeapMemoryTuner
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||||
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
|
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
|
||||||
|
@ -361,10 +363,10 @@ public class TestHeapMemoryManager {
|
||||||
heapMemoryManager.start(choreService);
|
heapMemoryManager.start(choreService);
|
||||||
// this should not change anything with onheap memstore
|
// this should not change anything with onheap memstore
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
Thread.sleep(1500);
|
Thread.sleep(1500);
|
||||||
// No changes should be made by tuner as we already have lot of empty space
|
// No changes should be made by tuner as we already have lot of empty space
|
||||||
|
@ -448,9 +450,9 @@ public class TestHeapMemoryManager {
|
||||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||||
heapMemoryManager.start(choreService);
|
heapMemoryManager.start(choreService);
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
blockCache.evictBlock(null);
|
blockCache.evictBlock(null);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
Thread.sleep(1500);
|
Thread.sleep(1500);
|
||||||
|
@ -459,9 +461,9 @@ public class TestHeapMemoryManager {
|
||||||
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
||||||
// Do some more flushes before the next run of HeapMemoryTuner
|
// Do some more flushes before the next run of HeapMemoryTuner
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||||
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
|
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
|
||||||
|
@ -494,9 +496,9 @@ public class TestHeapMemoryManager {
|
||||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||||
heapMemoryManager.start(choreService);
|
heapMemoryManager.start(choreService);
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
blockCache.evictBlock(null);
|
blockCache.evictBlock(null);
|
||||||
blockCache.evictBlock(null);
|
blockCache.evictBlock(null);
|
||||||
// Allow the tuner to run once and do necessary memory up
|
// Allow the tuner to run once and do necessary memory up
|
||||||
|
@ -506,7 +508,7 @@ public class TestHeapMemoryManager {
|
||||||
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
||||||
// Flushes that block updates
|
// Flushes that block updates
|
||||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
||||||
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
|
memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
|
||||||
blockCache.evictBlock(null);
|
blockCache.evictBlock(null);
|
||||||
blockCache.evictBlock(null);
|
blockCache.evictBlock(null);
|
||||||
blockCache.evictBlock(null);
|
blockCache.evictBlock(null);
|
||||||
|
@ -752,14 +754,19 @@ public class TestHeapMemoryManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean requestFlush(HRegion region, boolean forceFlushAllStores,
|
public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
|
||||||
FlushLifeCycleTracker tracker) {
|
|
||||||
this.listener.flushRequested(flushType, region);
|
this.listener.flushRequested(flushType, region);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) {
|
public boolean requestFlush(HRegion region, List<byte[]> families,
|
||||||
|
FlushLifeCycleTracker tracker) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requestDelayedFlush(HRegion region, long delay) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -141,7 +141,7 @@ public class TestSplitWalDataLoss {
|
||||||
long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
|
long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
|
||||||
LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
|
LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
|
||||||
assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
|
assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
|
||||||
rs.getMemStoreFlusher().requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY);
|
rs.getMemStoreFlusher().requestFlush(spiedRegion, FlushLifeCycleTracker.DUMMY);
|
||||||
synchronized (flushed) {
|
synchronized (flushed) {
|
||||||
while (!flushed.booleanValue()) {
|
while (!flushed.booleanValue()) {
|
||||||
flushed.wait();
|
flushed.wait();
|
||||||
|
|
|
@ -31,6 +31,7 @@ import static org.mockito.Mockito.when;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
@ -169,9 +171,9 @@ public abstract class AbstractTestFSWAL {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
|
protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
|
||||||
MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
|
MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes, String cf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final byte[] row = Bytes.toBytes("row");
|
final byte[] row = Bytes.toBytes(cf);
|
||||||
for (int i = 0; i < times; i++) {
|
for (int i = 0; i < times; i++) {
|
||||||
long timestamp = System.currentTimeMillis();
|
long timestamp = System.currentTimeMillis();
|
||||||
WALEdit cols = new WALEdit();
|
WALEdit cols = new WALEdit();
|
||||||
|
@ -253,8 +255,8 @@ public abstract class AbstractTestFSWAL {
|
||||||
* regions which should be flushed in order to archive the oldest wal file.
|
* regions which should be flushed in order to archive the oldest wal file.
|
||||||
* <p>
|
* <p>
|
||||||
* This method tests this behavior by inserting edits and rolling the wal enough times to reach
|
* This method tests this behavior by inserting edits and rolling the wal enough times to reach
|
||||||
* the max number of logs threshold. It checks whether we get the "right regions" for flush on
|
* the max number of logs threshold. It checks whether we get the "right regions and stores" for
|
||||||
* rolling the wal.
|
* flush on rolling the wal.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
@ -264,12 +266,23 @@ public abstract class AbstractTestFSWAL {
|
||||||
conf1.setInt("hbase.regionserver.maxlogs", 1);
|
conf1.setInt("hbase.regionserver.maxlogs", 1);
|
||||||
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
|
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
|
||||||
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
|
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
|
||||||
|
String cf1 = "cf1";
|
||||||
|
String cf2 = "cf2";
|
||||||
|
String cf3 = "cf3";
|
||||||
TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1"))
|
TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1"))
|
||||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
|
||||||
TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2"))
|
TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2"))
|
||||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
|
||||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
|
RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
|
||||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
|
RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
|
||||||
|
|
||||||
|
List<ColumnFamilyDescriptor> cfs = new ArrayList();
|
||||||
|
cfs.add(ColumnFamilyDescriptorBuilder.of(cf1));
|
||||||
|
cfs.add(ColumnFamilyDescriptorBuilder.of(cf2));
|
||||||
|
TableDescriptor t3 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t3"))
|
||||||
|
.setColumnFamilies(cfs).build();
|
||||||
|
RegionInfo hri3 = RegionInfoBuilder.newBuilder(t3.getTableName()).build();
|
||||||
|
|
||||||
// add edits and roll the wal
|
// add edits and roll the wal
|
||||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||||
NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
@ -280,26 +293,30 @@ public abstract class AbstractTestFSWAL {
|
||||||
for (byte[] fam : t2.getColumnFamilyNames()) {
|
for (byte[] fam : t2.getColumnFamilyNames()) {
|
||||||
scopes2.put(fam, 0);
|
scopes2.put(fam, 0);
|
||||||
}
|
}
|
||||||
|
NavigableMap<byte[], Integer> scopes3 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
for (byte[] fam : t3.getColumnFamilyNames()) {
|
||||||
|
scopes3.put(fam, 0);
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
|
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||||
wal.rollWriter();
|
wal.rollWriter();
|
||||||
// add some more edits and roll the wal. This would reach the log number threshold
|
// add some more edits and roll the wal. This would reach the log number threshold
|
||||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
|
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||||
wal.rollWriter();
|
wal.rollWriter();
|
||||||
// with above rollWriter call, the max logs limit is reached.
|
// with above rollWriter call, the max logs limit is reached.
|
||||||
assertTrue(wal.getNumRolledLogFiles() == 2);
|
assertTrue(wal.getNumRolledLogFiles() == 2);
|
||||||
|
|
||||||
// get the regions to flush; since there is only one region in the oldest wal, it should
|
// get the regions to flush; since there is only one region in the oldest wal, it should
|
||||||
// return only one region.
|
// return only one region.
|
||||||
byte[][] regionsToFlush = wal.findRegionsToForceFlush();
|
Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush();
|
||||||
assertEquals(1, regionsToFlush.length);
|
assertEquals(1, regionsToFlush.size());
|
||||||
assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
|
assertEquals(hri1.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]);
|
||||||
// insert edits in second region
|
// insert edits in second region
|
||||||
addEdits(wal, hri2, t2, 2, mvcc, scopes2);
|
addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
|
||||||
// get the regions to flush, it should still read region1.
|
// get the regions to flush, it should still read region1.
|
||||||
regionsToFlush = wal.findRegionsToForceFlush();
|
regionsToFlush = wal.findRegionsToForceFlush();
|
||||||
assertEquals(1, regionsToFlush.length);
|
assertEquals(1, regionsToFlush.size());
|
||||||
assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
|
assertEquals(hri1.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]);
|
||||||
// flush region 1, and roll the wal file. Only last wal which has entries for region1 should
|
// flush region 1, and roll the wal file. Only last wal which has entries for region1 should
|
||||||
// remain.
|
// remain.
|
||||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||||
|
@ -312,29 +329,50 @@ public abstract class AbstractTestFSWAL {
|
||||||
// no wal should remain now.
|
// no wal should remain now.
|
||||||
assertEquals(0, wal.getNumRolledLogFiles());
|
assertEquals(0, wal.getNumRolledLogFiles());
|
||||||
// add edits both to region 1 and region 2, and roll.
|
// add edits both to region 1 and region 2, and roll.
|
||||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
|
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||||
addEdits(wal, hri2, t2, 2, mvcc, scopes2);
|
addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
|
||||||
wal.rollWriter();
|
wal.rollWriter();
|
||||||
// add edits and roll the writer, to reach the max logs limit.
|
// add edits and roll the writer, to reach the max logs limit.
|
||||||
assertEquals(1, wal.getNumRolledLogFiles());
|
assertEquals(1, wal.getNumRolledLogFiles());
|
||||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
|
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||||
wal.rollWriter();
|
wal.rollWriter();
|
||||||
// it should return two regions to flush, as the oldest wal file has entries
|
// it should return two regions to flush, as the oldest wal file has entries
|
||||||
// for both regions.
|
// for both regions.
|
||||||
regionsToFlush = wal.findRegionsToForceFlush();
|
regionsToFlush = wal.findRegionsToForceFlush();
|
||||||
assertEquals(2, regionsToFlush.length);
|
assertEquals(2, regionsToFlush.size());
|
||||||
// flush both regions
|
// flush both regions
|
||||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
|
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
|
||||||
wal.rollWriter(true);
|
wal.rollWriter(true);
|
||||||
assertEquals(0, wal.getNumRolledLogFiles());
|
assertEquals(0, wal.getNumRolledLogFiles());
|
||||||
// Add an edit to region1, and roll the wal.
|
// Add an edit to region1, and roll the wal.
|
||||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
|
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||||
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
|
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
|
||||||
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||||
wal.rollWriter();
|
wal.rollWriter();
|
||||||
wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
|
wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
|
||||||
assertEquals(1, wal.getNumRolledLogFiles());
|
assertEquals(1, wal.getNumRolledLogFiles());
|
||||||
|
|
||||||
|
// clear test data
|
||||||
|
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||||
|
wal.rollWriter(true);
|
||||||
|
// add edits for three familes
|
||||||
|
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
|
||||||
|
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2);
|
||||||
|
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3);
|
||||||
|
wal.rollWriter();
|
||||||
|
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
|
||||||
|
wal.rollWriter();
|
||||||
|
assertEquals(2, wal.getNumRolledLogFiles());
|
||||||
|
// flush one family before archive oldest wal
|
||||||
|
Set<byte[]> flushedFamilyNames = new HashSet<>();
|
||||||
|
flushedFamilyNames.add(Bytes.toBytes(cf1));
|
||||||
|
flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames);
|
||||||
|
regionsToFlush = wal.findRegionsToForceFlush();
|
||||||
|
// then only two family need to be flushed when archive oldest wal
|
||||||
|
assertEquals(1, regionsToFlush.size());
|
||||||
|
assertEquals(hri3.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]);
|
||||||
|
assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size());
|
||||||
} finally {
|
} finally {
|
||||||
if (wal != null) {
|
if (wal != null) {
|
||||||
wal.close();
|
wal.close();
|
||||||
|
|
|
@ -1109,9 +1109,9 @@ public abstract class AbstractTestWALReplay {
|
||||||
private HRegion r;
|
private HRegion r;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) {
|
public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
|
||||||
try {
|
try {
|
||||||
r.flush(force);
|
r.flush(false);
|
||||||
return true;
|
return true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException("Exception flushing", e);
|
throw new RuntimeException("Exception flushing", e);
|
||||||
|
@ -1119,7 +1119,13 @@ public abstract class AbstractTestWALReplay {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
|
public boolean requestFlush(HRegion region, List<byte[]> families,
|
||||||
|
FlushLifeCycleTracker tracker) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requestDelayedFlush(HRegion region, long when) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
|
||||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
addEdits(log, hri, htd, 1, mvcc, scopes);
|
addEdits(log, hri, htd, 1, mvcc, scopes, "row");
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
log.close();
|
log.close();
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class TestSequenceIdAccounting {
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
||||||
assertTrue(sida.findLower(m) == null);
|
assertTrue(sida.findLower(m) == null);
|
||||||
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME));
|
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME));
|
||||||
assertTrue(sida.findLower(m).length == 1);
|
assertTrue(sida.findLower(m).size() == 1);
|
||||||
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1);
|
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1);
|
||||||
assertTrue(sida.findLower(m) == null);
|
assertTrue(sida.findLower(m) == null);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue