HDDS-2238. Container Data Scrubber spams log in empty cluster
Signed-off-by: Anu Engineer <aengineer@apache.org>
This commit is contained in:
parent
382967be51
commit
1877312440
|
@ -32,6 +32,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
|
@ -40,7 +41,6 @@ import java.util.Map;
|
|||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -50,17 +50,17 @@ public class ContainerSet {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ContainerSet.class);
|
||||
|
||||
private final ConcurrentSkipListMap<Long, Container> containerMap = new
|
||||
private final ConcurrentSkipListMap<Long, Container<?>> containerMap = new
|
||||
ConcurrentSkipListMap<>();
|
||||
private final ConcurrentSkipListSet<Long> missingContainerSet =
|
||||
new ConcurrentSkipListSet<>();
|
||||
/**
|
||||
* Add Container to container map.
|
||||
* @param container
|
||||
* @param container container to be added
|
||||
* @return If container is added to containerMap returns true, otherwise
|
||||
* false
|
||||
*/
|
||||
public boolean addContainer(Container container) throws
|
||||
public boolean addContainer(Container<?> container) throws
|
||||
StorageContainerException {
|
||||
Preconditions.checkNotNull(container, "container cannot be null");
|
||||
|
||||
|
@ -81,10 +81,10 @@ public class ContainerSet {
|
|||
|
||||
/**
|
||||
* Returns the Container with specified containerId.
|
||||
* @param containerId
|
||||
* @param containerId ID of the container to get
|
||||
* @return Container
|
||||
*/
|
||||
public Container getContainer(long containerId) {
|
||||
public Container<?> getContainer(long containerId) {
|
||||
Preconditions.checkState(containerId >= 0,
|
||||
"Container Id cannot be negative.");
|
||||
return containerMap.get(containerId);
|
||||
|
@ -92,14 +92,14 @@ public class ContainerSet {
|
|||
|
||||
/**
|
||||
* Removes the Container matching with specified containerId.
|
||||
* @param containerId
|
||||
* @param containerId ID of the container to remove
|
||||
* @return If container is removed from containerMap returns true, otherwise
|
||||
* false
|
||||
*/
|
||||
public boolean removeContainer(long containerId) {
|
||||
Preconditions.checkState(containerId >= 0,
|
||||
"Container Id cannot be negative.");
|
||||
Container removed = containerMap.remove(containerId);
|
||||
Container<?> removed = containerMap.remove(containerId);
|
||||
if(removed == null) {
|
||||
LOG.debug("Container with containerId {} is not present in " +
|
||||
"containerMap", containerId);
|
||||
|
@ -122,9 +122,9 @@ public class ContainerSet {
|
|||
|
||||
/**
|
||||
* Return an container Iterator over {@link ContainerSet#containerMap}.
|
||||
* @return {@literal Iterator<Container>}
|
||||
* @return {@literal Iterator<Container<?>>}
|
||||
*/
|
||||
public Iterator<Container> getContainerIterator() {
|
||||
public Iterator<Container<?>> getContainerIterator() {
|
||||
return containerMap.values().iterator();
|
||||
}
|
||||
|
||||
|
@ -132,26 +132,23 @@ public class ContainerSet {
|
|||
* Return an iterator of containers associated with the specified volume.
|
||||
*
|
||||
* @param volume the HDDS volume which should be used to filter containers
|
||||
* @return {@literal Iterator<Container>}
|
||||
* @return {@literal Iterator<Container<?>>}
|
||||
*/
|
||||
public Iterator<Container> getContainerIterator(HddsVolume volume) {
|
||||
public Iterator<Container<?>> getContainerIterator(HddsVolume volume) {
|
||||
Preconditions.checkNotNull(volume);
|
||||
Preconditions.checkNotNull(volume.getStorageID());
|
||||
String volumeUuid = volume.getStorageID();
|
||||
return containerMap.values()
|
||||
.stream()
|
||||
.filter(x -> volumeUuid.equals(
|
||||
x.getContainerData().getVolume()
|
||||
.getStorageID()))
|
||||
.iterator();
|
||||
return containerMap.values().stream()
|
||||
.filter(x -> volumeUuid.equals(x.getContainerData().getVolume()
|
||||
.getStorageID()))
|
||||
.iterator();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an containerMap iterator over {@link ContainerSet#containerMap}.
|
||||
* @return containerMap Iterator
|
||||
*/
|
||||
public Iterator<Map.Entry<Long, Container>> getContainerMapIterator() {
|
||||
containerMap.keySet().stream().collect(Collectors.toSet());
|
||||
public Iterator<Map.Entry<Long, Container<?>>> getContainerMapIterator() {
|
||||
return containerMap.entrySet().iterator();
|
||||
}
|
||||
|
||||
|
@ -160,11 +157,11 @@ public class ContainerSet {
|
|||
* @return containerMap
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public Map<Long, Container> getContainerMapCopy() {
|
||||
public Map<Long, Container<?>> getContainerMapCopy() {
|
||||
return ImmutableMap.copyOf(containerMap);
|
||||
}
|
||||
|
||||
public Map<Long, Container> getContainerMap() {
|
||||
public Map<Long, Container<?>> getContainerMap() {
|
||||
return Collections.unmodifiableMap(containerMap);
|
||||
}
|
||||
|
||||
|
@ -179,7 +176,6 @@ public class ContainerSet {
|
|||
* @param startContainerId - Return containers with Id >= startContainerId.
|
||||
* @param count - how many to return
|
||||
* @param data - Actual containerData
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
public void listContainer(long startContainerId, long count,
|
||||
List<ContainerData> data) throws
|
||||
|
@ -193,14 +189,14 @@ public class ContainerSet {
|
|||
"must be positive");
|
||||
LOG.debug("listContainer returns containerData starting from {} of count " +
|
||||
"{}", startContainerId, count);
|
||||
ConcurrentNavigableMap<Long, Container> map;
|
||||
ConcurrentNavigableMap<Long, Container<?>> map;
|
||||
if (startContainerId == 0) {
|
||||
map = containerMap.tailMap(containerMap.firstKey(), true);
|
||||
} else {
|
||||
map = containerMap.tailMap(startContainerId, true);
|
||||
}
|
||||
int currentCount = 0;
|
||||
for (Container entry : map.values()) {
|
||||
for (Container<?> entry : map.values()) {
|
||||
if (currentCount < count) {
|
||||
data.add(entry.getContainerData());
|
||||
currentCount++;
|
||||
|
@ -214,7 +210,6 @@ public class ContainerSet {
|
|||
* Get container report.
|
||||
*
|
||||
* @return The container report.
|
||||
* @throws IOException
|
||||
*/
|
||||
public ContainerReportsProto getContainerReport() throws IOException {
|
||||
LOG.debug("Starting container report iteration.");
|
||||
|
@ -222,13 +217,12 @@ public class ContainerSet {
|
|||
// No need for locking since containerMap is a ConcurrentSkipListMap
|
||||
// And we can never get the exact state since close might happen
|
||||
// after we iterate a point.
|
||||
List<Container> containers = containerMap.values().stream().collect(
|
||||
Collectors.toList());
|
||||
List<Container<?>> containers = new ArrayList<>(containerMap.values());
|
||||
|
||||
ContainerReportsProto.Builder crBuilder =
|
||||
ContainerReportsProto.newBuilder();
|
||||
|
||||
for (Container container: containers) {
|
||||
for (Container<?> container: containers) {
|
||||
crBuilder.addReports(container.getContainerReport());
|
||||
}
|
||||
|
||||
|
@ -257,7 +251,7 @@ public class ContainerSet {
|
|||
LOG.warn("Adding container {} to missing container set.", id);
|
||||
missingContainerSet.add(id);
|
||||
} else {
|
||||
Container container = containerMap.get(id);
|
||||
Container<?> container = containerMap.get(id);
|
||||
long containerBCSID = container.getBlockCommitSequenceId();
|
||||
long snapshotBCSID = mapEntry.getValue();
|
||||
if (containerBCSID < snapshotBCSID) {
|
||||
|
|
|
@ -140,7 +140,6 @@ public class ContainerController {
|
|||
* @param containerId Id of the container to be deleted
|
||||
* @param force if this is set to true, we delete container without checking
|
||||
* state of the container.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void deleteContainer(final long containerId, boolean force)
|
||||
throws IOException {
|
||||
|
@ -160,7 +159,7 @@ public class ContainerController {
|
|||
return handlers.get(container.getContainerType());
|
||||
}
|
||||
|
||||
public Iterator<Container> getContainers() {
|
||||
public Iterator<Container<?>> getContainers() {
|
||||
return containerSet.getContainerIterator();
|
||||
}
|
||||
|
||||
|
@ -171,7 +170,8 @@ public class ContainerController {
|
|||
* @param volume the HDDS volume which should be used to filter containers
|
||||
* @return {@literal Iterator<Container>}
|
||||
*/
|
||||
public Iterator<Container> getContainers(HddsVolume volume) {
|
||||
public Iterator<Container<?>> getContainers(HddsVolume volume) {
|
||||
return containerSet.getContainerIterator(volume);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.util.Iterator;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.util.Canceler;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
|
@ -46,6 +45,7 @@ public class ContainerDataScanner extends Thread {
|
|||
private final DataTransferThrottler throttler;
|
||||
private final Canceler canceler;
|
||||
private final ContainerDataScrubberMetrics metrics;
|
||||
private final long dataScanInterval;
|
||||
|
||||
/**
|
||||
* True if the thread is stopping.<p/>
|
||||
|
@ -54,15 +54,15 @@ public class ContainerDataScanner extends Thread {
|
|||
private volatile boolean stopping = false;
|
||||
|
||||
|
||||
public ContainerDataScanner(Configuration conf,
|
||||
public ContainerDataScanner(ContainerScrubberConfiguration conf,
|
||||
ContainerController controller,
|
||||
HddsVolume volume, long bytesPerSec) {
|
||||
HddsVolume volume) {
|
||||
this.controller = controller;
|
||||
this.volume = volume;
|
||||
this.throttler = new HddsDataTransferThrottler(bytesPerSec);
|
||||
this.canceler = new Canceler();
|
||||
this.metrics = ContainerDataScrubberMetrics.create(conf,
|
||||
volume.toString());
|
||||
dataScanInterval = conf.getDataScanInterval();
|
||||
throttler = new HddsDataTransferThrottler(conf.getBandwidthPerVolume());
|
||||
canceler = new Canceler();
|
||||
metrics = ContainerDataScrubberMetrics.create(volume.toString());
|
||||
setName("ContainerDataScanner(" + volume + ")");
|
||||
setDaemon(true);
|
||||
}
|
||||
|
@ -89,7 +89,7 @@ public class ContainerDataScanner extends Thread {
|
|||
@VisibleForTesting
|
||||
public void runIteration() {
|
||||
long startTime = System.nanoTime();
|
||||
Iterator<Container> itr = controller.getContainers(volume);
|
||||
Iterator<Container<?>> itr = controller.getContainers(volume);
|
||||
while (!stopping && itr.hasNext()) {
|
||||
Container c = itr.next();
|
||||
if (c.shouldScanData()) {
|
||||
|
@ -110,16 +110,26 @@ public class ContainerDataScanner extends Thread {
|
|||
}
|
||||
long totalDuration = System.nanoTime() - startTime;
|
||||
if (!stopping) {
|
||||
metrics.incNumScanIterations();
|
||||
LOG.info("Completed an iteration of container data scrubber in" +
|
||||
" {} minutes." +
|
||||
" Number of iterations (since the data-node restart) : {}" +
|
||||
", Number of containers scanned in this iteration : {}" +
|
||||
", Number of unhealthy containers found in this iteration : {}",
|
||||
TimeUnit.NANOSECONDS.toMinutes(totalDuration),
|
||||
metrics.getNumScanIterations(),
|
||||
metrics.getNumContainersScanned(),
|
||||
metrics.getNumUnHealthyContainers());
|
||||
if (metrics.getNumContainersScanned() > 0) {
|
||||
metrics.incNumScanIterations();
|
||||
LOG.info("Completed an iteration of container data scrubber in" +
|
||||
" {} minutes." +
|
||||
" Number of iterations (since the data-node restart) : {}" +
|
||||
", Number of containers scanned in this iteration : {}" +
|
||||
", Number of unhealthy containers found in this iteration : {}",
|
||||
TimeUnit.NANOSECONDS.toMinutes(totalDuration),
|
||||
metrics.getNumScanIterations(),
|
||||
metrics.getNumContainersScanned(),
|
||||
metrics.getNumUnHealthyContainers());
|
||||
}
|
||||
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(totalDuration);
|
||||
long remainingSleep = dataScanInterval - elapsedMillis;
|
||||
if (remainingSleep > 0) {
|
||||
try {
|
||||
Thread.sleep(remainingSleep);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
|
@ -103,8 +102,7 @@ public final class ContainerDataScrubberMetrics {
|
|||
this.ms = ms;
|
||||
}
|
||||
|
||||
public static ContainerDataScrubberMetrics create(final Configuration conf,
|
||||
final String volumeName) {
|
||||
public static ContainerDataScrubberMetrics create(final String volumeName) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
String name = "ContainerDataScrubberMetrics-"+ (volumeName.isEmpty()
|
||||
? "UndefinedDataNodeVolume"+ ThreadLocalRandom.current().nextInt()
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -44,24 +43,22 @@ public class ContainerMetadataScanner extends Thread {
|
|||
*/
|
||||
private boolean stopping = false;
|
||||
|
||||
public ContainerMetadataScanner(Configuration conf,
|
||||
ContainerController controller,
|
||||
long metadataScanInterval) {
|
||||
public ContainerMetadataScanner(ContainerScrubberConfiguration conf,
|
||||
ContainerController controller) {
|
||||
this.controller = controller;
|
||||
this.metadataScanInterval = metadataScanInterval;
|
||||
this.metrics = ContainerMetadataScrubberMetrics.create(conf);
|
||||
this.metadataScanInterval = conf.getMetadataScanInterval();
|
||||
this.metrics = ContainerMetadataScrubberMetrics.create();
|
||||
setName("ContainerMetadataScanner");
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
/**
|
||||
* the outer daemon loop exits on down()
|
||||
/*
|
||||
* the outer daemon loop exits on shutdown()
|
||||
*/
|
||||
LOG.info("Background ContainerMetadataScanner starting up");
|
||||
while (!stopping) {
|
||||
long start = System.nanoTime();
|
||||
runIteration();
|
||||
if(!stopping) {
|
||||
metrics.resetNumUnhealthyContainers();
|
||||
|
@ -71,9 +68,9 @@ public class ContainerMetadataScanner extends Thread {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void runIteration() {
|
||||
void runIteration() {
|
||||
long start = System.nanoTime();
|
||||
Iterator<Container> containerIt = controller.getContainers();
|
||||
Iterator<Container<?>> containerIt = controller.getContainers();
|
||||
while (!stopping && containerIt.hasNext()) {
|
||||
Container container = containerIt.next();
|
||||
try {
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
|
@ -83,9 +82,9 @@ public final class ContainerMetadataScrubberMetrics {
|
|||
this.ms = ms;
|
||||
}
|
||||
|
||||
public static ContainerMetadataScrubberMetrics create(Configuration conf) {
|
||||
public static ContainerMetadataScrubberMetrics create() {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
String name = "ContainerDataScrubberMetrics";
|
||||
String name = "ContainerMetadataScrubberMetrics";
|
||||
return ms.register(name, null,
|
||||
new ContainerMetadataScrubberMetrics(name, ms));
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.conf.ConfigType;
|
|||
public class ContainerScrubberConfiguration {
|
||||
private boolean enabled;
|
||||
private long metadataScanInterval;
|
||||
private long dataScanInterval;
|
||||
private long bandwidthPerVolume;
|
||||
|
||||
@Config(key = "enabled",
|
||||
|
@ -58,6 +59,22 @@ public class ContainerScrubberConfiguration {
|
|||
return metadataScanInterval;
|
||||
}
|
||||
|
||||
@Config(key = "data.scan.interval",
|
||||
type = ConfigType.TIME,
|
||||
defaultValue = "1m",
|
||||
tags = { ConfigTag.STORAGE },
|
||||
description = "Minimum time interval between two iterations of container"
|
||||
+ " data scanning. If an iteration takes less time than this, the"
|
||||
+ " scanner will wait before starting the next iteration."
|
||||
)
|
||||
public void setDataScanInterval(long dataScanInterval) {
|
||||
this.dataScanInterval = dataScanInterval;
|
||||
}
|
||||
|
||||
public long getDataScanInterval() {
|
||||
return dataScanInterval;
|
||||
}
|
||||
|
||||
@Config(key = "volume.bytes.per.second",
|
||||
type = ConfigType.LONG,
|
||||
defaultValue = "1048576",
|
||||
|
|
|
@ -172,22 +172,18 @@ public class OzoneContainer {
|
|||
ContainerScrubberConfiguration c = config.getObject(
|
||||
ContainerScrubberConfiguration.class);
|
||||
boolean enabled = c.isEnabled();
|
||||
long metadataScanInterval = c.getMetadataScanInterval();
|
||||
long bytesPerSec = c.getBandwidthPerVolume();
|
||||
|
||||
if (!enabled) {
|
||||
LOG.info("Background container scanner has been disabled.");
|
||||
} else {
|
||||
if (this.metadataScanner == null) {
|
||||
this.metadataScanner = new ContainerMetadataScanner(config, controller,
|
||||
metadataScanInterval);
|
||||
this.metadataScanner = new ContainerMetadataScanner(c, controller);
|
||||
}
|
||||
this.metadataScanner.start();
|
||||
|
||||
dataScanners = new ArrayList<>();
|
||||
for (HddsVolume v : volumeSet.getVolumesList()) {
|
||||
ContainerDataScanner s = new ContainerDataScanner(config, controller,
|
||||
v, bytesPerSec);
|
||||
ContainerDataScanner s = new ContainerDataScanner(c, controller, v);
|
||||
s.start();
|
||||
dataScanners.add(s);
|
||||
}
|
||||
|
|
|
@ -96,12 +96,11 @@ public class TestContainerSet {
|
|||
|
||||
assertEquals(10, containerSet.containerCount());
|
||||
|
||||
// Using containerIterator.
|
||||
Iterator<Container> containerIterator = containerSet.getContainerIterator();
|
||||
Iterator<Container<?>> iterator = containerSet.getContainerIterator();
|
||||
|
||||
int count = 0;
|
||||
while(containerIterator.hasNext()) {
|
||||
Container kv = containerIterator.next();
|
||||
while(iterator.hasNext()) {
|
||||
Container kv = iterator.next();
|
||||
ContainerData containerData = kv.getContainerData();
|
||||
long containerId = containerData.getContainerID();
|
||||
if (containerId%2 == 0) {
|
||||
|
@ -116,7 +115,7 @@ public class TestContainerSet {
|
|||
assertEquals(10, count);
|
||||
|
||||
//Using containerMapIterator.
|
||||
Iterator<Map.Entry<Long, Container>> containerMapIterator = containerSet
|
||||
Iterator<Map.Entry<Long, Container<?>>> containerMapIterator = containerSet
|
||||
.getContainerMapIterator();
|
||||
|
||||
count = 0;
|
||||
|
@ -160,26 +159,25 @@ public class TestContainerSet {
|
|||
containerSet.addContainer(kv);
|
||||
}
|
||||
|
||||
Iterator<Container> iter1 = containerSet.getContainerIterator(vol1);
|
||||
Iterator<Container<?>> iter1 = containerSet.getContainerIterator(vol1);
|
||||
int count1 = 0;
|
||||
while (iter1.hasNext()) {
|
||||
Container c = iter1.next();
|
||||
assertTrue((c.getContainerData().getContainerID() % 2) == 0);
|
||||
assertEquals(0, (c.getContainerData().getContainerID() % 2));
|
||||
count1++;
|
||||
}
|
||||
assertEquals(5, count1);
|
||||
|
||||
Iterator<Container> iter2 = containerSet.getContainerIterator(vol2);
|
||||
Iterator<Container<?>> iter2 = containerSet.getContainerIterator(vol2);
|
||||
int count2 = 0;
|
||||
while (iter2.hasNext()) {
|
||||
Container c = iter2.next();
|
||||
assertTrue((c.getContainerData().getContainerID() % 2) == 1);
|
||||
assertEquals(1, (c.getContainerData().getContainerID() % 2));
|
||||
count2++;
|
||||
}
|
||||
assertEquals(5, count2);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGetContainerReport() throws IOException {
|
||||
|
||||
|
|
|
@ -27,7 +27,8 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* This test verifies the container scrubber metrics functionality.
|
||||
|
@ -42,8 +43,7 @@ public class TestContainerScrubberMetrics {
|
|||
HddsVolume vol = Mockito.mock(HddsVolume.class);
|
||||
ContainerController cntrl = mockContainerController(vol);
|
||||
|
||||
ContainerMetadataScanner mc = new ContainerMetadataScanner(conf,
|
||||
cntrl, c.getMetadataScanInterval());
|
||||
ContainerMetadataScanner mc = new ContainerMetadataScanner(c, cntrl);
|
||||
mc.runIteration();
|
||||
|
||||
Assert.assertEquals(1, mc.getMetrics().getNumScanIterations());
|
||||
|
@ -56,11 +56,11 @@ public class TestContainerScrubberMetrics {
|
|||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
ContainerScrubberConfiguration c = conf.getObject(
|
||||
ContainerScrubberConfiguration.class);
|
||||
c.setDataScanInterval(0);
|
||||
HddsVolume vol = Mockito.mock(HddsVolume.class);
|
||||
ContainerController cntrl = mockContainerController(vol);
|
||||
|
||||
ContainerDataScanner sc = new ContainerDataScanner(conf, cntrl,
|
||||
vol, c.getBandwidthPerVolume());
|
||||
ContainerDataScanner sc = new ContainerDataScanner(c, cntrl, vol);
|
||||
sc.runIteration();
|
||||
|
||||
ContainerDataScrubberMetrics m = sc.getMetrics();
|
||||
|
@ -71,7 +71,7 @@ public class TestContainerScrubberMetrics {
|
|||
|
||||
private ContainerController mockContainerController(HddsVolume vol) {
|
||||
// healthy container
|
||||
Container c1 = Mockito.mock(Container.class);
|
||||
Container<ContainerData> c1 = Mockito.mock(Container.class);
|
||||
Mockito.when(c1.shouldScanData()).thenReturn(true);
|
||||
Mockito.when(c1.scanMetaData()).thenReturn(true);
|
||||
Mockito.when(c1.scanData(
|
||||
|
@ -81,7 +81,7 @@ public class TestContainerScrubberMetrics {
|
|||
// unhealthy container (corrupt data)
|
||||
ContainerData c2d = Mockito.mock(ContainerData.class);
|
||||
Mockito.when(c2d.getContainerID()).thenReturn(101L);
|
||||
Container c2 = Mockito.mock(Container.class);
|
||||
Container<ContainerData> c2 = Mockito.mock(Container.class);
|
||||
Mockito.when(c2.scanMetaData()).thenReturn(true);
|
||||
Mockito.when(c2.shouldScanData()).thenReturn(true);
|
||||
Mockito.when(c2.scanData(
|
||||
|
@ -92,20 +92,17 @@ public class TestContainerScrubberMetrics {
|
|||
// unhealthy container (corrupt metadata)
|
||||
ContainerData c3d = Mockito.mock(ContainerData.class);
|
||||
Mockito.when(c3d.getContainerID()).thenReturn(102L);
|
||||
Container c3 = Mockito.mock(Container.class);
|
||||
Container<ContainerData> c3 = Mockito.mock(Container.class);
|
||||
Mockito.when(c3.shouldScanData()).thenReturn(false);
|
||||
Mockito.when(c3.scanMetaData()).thenReturn(false);
|
||||
Mockito.when(c3.getContainerData()).thenReturn(c3d);
|
||||
|
||||
Iterator<Container> iter = Mockito.mock(Iterator.class);
|
||||
Mockito.when(iter.hasNext()).thenReturn(true, true, true, false);
|
||||
Mockito.when(iter.next()).thenReturn(c1, c2, c3);
|
||||
|
||||
Collection<Container<?>> containers = Arrays.asList(c1, c2, c3);
|
||||
ContainerController cntrl = Mockito.mock(ContainerController.class);
|
||||
Mockito.when(cntrl.getContainers(vol))
|
||||
.thenReturn(iter);
|
||||
.thenReturn(containers.iterator());
|
||||
Mockito.when(cntrl.getContainers())
|
||||
.thenReturn(iter);
|
||||
.thenReturn(containers.iterator());
|
||||
|
||||
return cntrl;
|
||||
}
|
||||
|
|
|
@ -201,7 +201,7 @@ public class TestBlockDeletingService {
|
|||
BlockDeletingServiceTestImpl svc =
|
||||
getBlockDeletinService(containerSet, conf, 1000);
|
||||
svc.start();
|
||||
GenericTestUtils.waitFor(() -> svc.isStarted(), 100, 3000);
|
||||
GenericTestUtils.waitFor(svc::isStarted, 100, 3000);
|
||||
|
||||
// Ensure 1 container was created
|
||||
List<ContainerData> containerData = Lists.newArrayList();
|
||||
|
@ -210,7 +210,7 @@ public class TestBlockDeletingService {
|
|||
|
||||
try(ReferenceCountedDB meta = BlockUtils.getDB(
|
||||
(KeyValueContainerData) containerData.get(0), conf)) {
|
||||
Map<Long, Container> containerMap = containerSet.getContainerMapCopy();
|
||||
Map<Long, Container<?>> containerMap = containerSet.getContainerMapCopy();
|
||||
// NOTE: this test assumes that all the container is KetValueContainer and
|
||||
// have DeleteTransactionId in KetValueContainerData. If other
|
||||
// types is going to be added, this test should be checked.
|
||||
|
@ -261,7 +261,7 @@ public class TestBlockDeletingService {
|
|||
BlockDeletingServiceTestImpl service =
|
||||
getBlockDeletinService(containerSet, conf, 1000);
|
||||
service.start();
|
||||
GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
|
||||
GenericTestUtils.waitFor(service::isStarted, 100, 3000);
|
||||
|
||||
// Run some deleting tasks and verify there are threads running
|
||||
service.runDeletingTasks();
|
||||
|
@ -340,7 +340,7 @@ public class TestBlockDeletingService {
|
|||
|
||||
// The block deleting successfully and shouldn't catch timed
|
||||
// out warning log.
|
||||
Assert.assertTrue(!newLog.getOutput().contains(
|
||||
Assert.assertFalse(newLog.getOutput().contains(
|
||||
"Background task executes timed out, retrying in next interval"));
|
||||
}
|
||||
svc.shutdown();
|
||||
|
@ -351,9 +351,7 @@ public class TestBlockDeletingService {
|
|||
OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class);
|
||||
Mockito.when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
|
||||
Mockito.when(ozoneContainer.getWriteChannel()).thenReturn(null);
|
||||
BlockDeletingServiceTestImpl service =
|
||||
new BlockDeletingServiceTestImpl(ozoneContainer, timeout, conf);
|
||||
return service;
|
||||
return new BlockDeletingServiceTestImpl(ozoneContainer, timeout, conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
|
@ -382,7 +380,7 @@ public class TestBlockDeletingService {
|
|||
service.start();
|
||||
|
||||
try {
|
||||
GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
|
||||
GenericTestUtils.waitFor(service::isStarted, 100, 3000);
|
||||
// 1st interval processes 1 container 1 block and 10 chunks
|
||||
deleteAndWait(service, 1);
|
||||
Assert.assertEquals(10, getNumberOfChunksInContainers(containerSet));
|
||||
|
@ -395,7 +393,7 @@ public class TestBlockDeletingService {
|
|||
if (getNumberOfChunksInContainers(containerSet) == 0) {
|
||||
return true;
|
||||
}
|
||||
} catch (Exception e) {}
|
||||
} catch (Exception ignored) {}
|
||||
return false;
|
||||
}, 100, 100000);
|
||||
Assert.assertEquals(0, getNumberOfChunksInContainers(containerSet));
|
||||
|
@ -436,7 +434,7 @@ public class TestBlockDeletingService {
|
|||
service.start();
|
||||
|
||||
try {
|
||||
GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
|
||||
GenericTestUtils.waitFor(service::isStarted, 100, 3000);
|
||||
// Total blocks = 3 * 5 = 15
|
||||
// block per task = 2
|
||||
// number of containers = 5
|
||||
|
@ -453,10 +451,10 @@ public class TestBlockDeletingService {
|
|||
}
|
||||
|
||||
private int getNumberOfChunksInContainers(ContainerSet containerSet) {
|
||||
Iterator<Container> containerIterator = containerSet.getContainerIterator();
|
||||
Iterator<Container<?>> iterator = containerSet.getContainerIterator();
|
||||
int numChunks = 0;
|
||||
while (containerIterator.hasNext()) {
|
||||
Container container = containerIterator.next();
|
||||
while (iterator.hasNext()) {
|
||||
Container container = iterator.next();
|
||||
File chunkDir = FileUtils.getFile(
|
||||
((KeyValueContainerData) container.getContainerData())
|
||||
.getChunksPath());
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
|||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerMetadataScanner;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubberConfiguration;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||
|
@ -159,8 +160,10 @@ public class TestDataScrubber {
|
|||
deleteDirectory(chunksDir);
|
||||
Assert.assertFalse(chunksDir.exists());
|
||||
|
||||
ContainerMetadataScanner sb = new ContainerMetadataScanner(ozoneConfig,
|
||||
oc.getController(), 0);
|
||||
ContainerScrubberConfiguration conf = ozoneConfig.getObject(
|
||||
ContainerScrubberConfiguration.class);
|
||||
ContainerMetadataScanner sb = new ContainerMetadataScanner(conf,
|
||||
oc.getController());
|
||||
sb.scrub(c);
|
||||
|
||||
// wait for the incremental container report to propagate to SCM
|
||||
|
|
Loading…
Reference in New Issue