Add support for checksum verification in data scrubber

Signed-off-by: Anu Engineer <aengineer@apache.org>
This commit is contained in:
Hrishikesh Gadre 2019-07-11 15:08:56 -07:00 committed by Anu Engineer
parent 3db7184082
commit f347c348d8
15 changed files with 543 additions and 266 deletions

View File

@ -65,15 +65,12 @@ public final class HddsConfigKeys {
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
public static final String HDDS_SCM_SAFEMODE_ENABLED = public static final String HDDS_SCM_SAFEMODE_ENABLED =
"hdds.scm.safemode.enabled"; "hdds.scm.safemode.enabled";
public static final String HDDS_CONTAINERSCRUB_ENABLED =
"hdds.containerscrub.enabled";
public static final boolean HDDS_CONTAINERSCRUB_ENABLED_DEFAULT = false;
public static final boolean HDDS_SCM_SAFEMODE_ENABLED_DEFAULT = true; public static final boolean HDDS_SCM_SAFEMODE_ENABLED_DEFAULT = true;
public static final String HDDS_SCM_SAFEMODE_MIN_DATANODE = public static final String HDDS_SCM_SAFEMODE_MIN_DATANODE =
"hdds.scm.safemode.min.datanode"; "hdds.scm.safemode.min.datanode";
public static final int HDDS_SCM_SAFEMODE_MIN_DATANODE_DEFAULT = 1; public static final int HDDS_SCM_SAFEMODE_MIN_DATANODE_DEFAULT = 1;
public static final String public static final String
HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT = HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT =
"hdds.scm.wait.time.after.safemode.exit"; "hdds.scm.wait.time.after.safemode.exit";

View File

@ -40,9 +40,14 @@ public class ChecksumData {
private List<ByteString> checksums; private List<ByteString> checksums;
public ChecksumData(ChecksumType checksumType, int bytesPerChecksum) { public ChecksumData(ChecksumType checksumType, int bytesPerChecksum) {
this(checksumType, bytesPerChecksum, Lists.newArrayList());
}
public ChecksumData(ChecksumType checksumType, int bytesPerChecksum,
List<ByteString> checksums) {
this.type = checksumType; this.type = checksumType;
this.bytesPerChecksum = bytesPerChecksum; this.bytesPerChecksum = bytesPerChecksum;
this.checksums = Lists.newArrayList(); this.checksums = checksums;
} }
/** /**

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.scm.container.common.helpers import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException; .StorageContainerException;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@ -153,9 +155,27 @@ public interface Container<CONTAINERDATA extends ContainerData> extends RwLock {
void updateBlockCommitSequenceId(long blockCommitSequenceId); void updateBlockCommitSequenceId(long blockCommitSequenceId);
/** /**
* check and report the structural integrity of the container. * Scan the container metadata to detect corruption.
* @return true if the integrity checks pass */
boolean scanMetaData();
/**
* Return if the container data should be checksum verified to detect
* corruption. The result depends upon the current state of the container
* (e.g. if a container is accepting writes, it may not be a good idea to
* perform checksum verification to avoid concurrency issues).
*/
boolean shouldScanData();
/**
* Perform checksum verification for the container data.
*
* @param throttler A reference of {@link DataTransferThrottler} used to
* perform I/O bandwidth throttling
* @param canceler A reference of {@link Canceler} used to cancel the
* I/O bandwidth throttling (e.g. for shutdown purpose).
* @return true if the checksum verification succeeds
* false otherwise * false otherwise
*/ */
boolean check(); boolean scanData(DataTransferThrottler throttler, Canceler canceler);
} }

View File

@ -39,6 +39,8 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.common.helpers import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException; .StorageContainerException;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
@ -671,52 +673,33 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
.getContainerID() + OzoneConsts.DN_CONTAINER_DB); .getContainerID() + OzoneConsts.DN_CONTAINER_DB);
} }
/** public boolean scanMetaData() {
* run integrity checks on the Container metadata.
*/
public boolean check() {
ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK;
long containerId = containerData.getContainerID(); long containerId = containerData.getContainerID();
KeyValueContainerCheck checker =
new KeyValueContainerCheck(containerData.getMetadataPath(), config,
containerId);
return checker.fastCheck();
}
switch (containerData.getState()) { @Override
case OPEN: public boolean shouldScanData() {
level = ContainerCheckLevel.FAST_CHECK; return containerData.getState() == ContainerDataProto.State.CLOSED
LOG.info("Doing Fast integrity checks for Container ID : {}," || containerData.getState() == ContainerDataProto.State.QUASI_CLOSED;
+ " because it is OPEN", containerId); }
break;
case CLOSING: public boolean scanData(DataTransferThrottler throttler, Canceler canceler) {
level = ContainerCheckLevel.FAST_CHECK; if (!shouldScanData()) {
LOG.info("Doing Fast integrity checks for Container ID : {}," throw new IllegalStateException("The checksum verification can not be" +
+ " because it is CLOSING", containerId); " done for container in state "
break; + containerData.getState());
case CLOSED:
case QUASI_CLOSED:
level = ContainerCheckLevel.FULL_CHECK;
LOG.debug("Doing Full integrity checks for Container ID : {},"
+ " because it is in {} state", containerId,
containerData.getState());
break;
default:
break;
}
if (level == ContainerCheckLevel.NO_CHECK) {
LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
return true;
} }
long containerId = containerData.getContainerID();
KeyValueContainerCheck checker = KeyValueContainerCheck checker =
new KeyValueContainerCheck(containerData.getMetadataPath(), config, new KeyValueContainerCheck(containerData.getMetadataPath(), config,
containerId); containerId);
switch (level) { return checker.fullCheck(throttler, canceler);
case FAST_CHECK:
return checker.fastCheck();
case FULL_CHECK:
return checker.fullCheck();
default:
return true;
}
} }
private enum ContainerCheckLevel { private enum ContainerCheckLevel {

View File

@ -22,6 +22,11 @@ import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@ -30,12 +35,15 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.io.InputStream;
import java.util.Arrays;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -101,13 +109,13 @@ public class KeyValueContainerCheck {
* *
* @return true : integrity checks pass, false : otherwise. * @return true : integrity checks pass, false : otherwise.
*/ */
public boolean fullCheck() { public boolean fullCheck(DataTransferThrottler throttler, Canceler canceler) {
boolean valid = false; boolean valid = false;
try { try {
valid = fastCheck(); valid = fastCheck();
if (valid) { if (valid) {
checkBlockDB(); scanData(throttler, canceler);
} }
} catch (IOException e) { } catch (IOException e) {
handleCorruption(e); handleCorruption(e);
@ -194,7 +202,8 @@ public class KeyValueContainerCheck {
} }
} }
private void checkBlockDB() throws IOException { private void scanData(DataTransferThrottler throttler, Canceler canceler)
throws IOException {
/** /**
* Check the integrity of the DB inside each container. * Check the integrity of the DB inside each container.
* In Scope: * In Scope:
@ -220,43 +229,67 @@ public class KeyValueContainerCheck {
throw new IOException(dbFileErrorMsg); throw new IOException(dbFileErrorMsg);
} }
onDiskContainerData.setDbFile(dbFile); onDiskContainerData.setDbFile(dbFile);
try(ReferenceCountedDB db = try(ReferenceCountedDB db =
BlockUtils.getDB(onDiskContainerData, checkConfig)) { BlockUtils.getDB(onDiskContainerData, checkConfig);
iterateBlockDB(db); KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
} new File(onDiskContainerData.getContainerPath()))) {
}
private void iterateBlockDB(ReferenceCountedDB db) while(kvIter.hasNext()) {
throws IOException {
Preconditions.checkState(db != null);
// get "normal" keys from the Block DB
try(KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
new File(onDiskContainerData.getContainerPath()))) {
// ensure there is a chunk file for each key in the DB
while (kvIter.hasNext()) {
BlockData block = kvIter.nextBlock(); BlockData block = kvIter.nextBlock();
for(ContainerProtos.ChunkInfo chunk : block.getChunks()) {
List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks(); File chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
File chunkFile;
chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
ChunkInfo.getFromProtoBuf(chunk)); ChunkInfo.getFromProtoBuf(chunk));
if (!chunkFile.exists()) { if (!chunkFile.exists()) {
// concurrent mutation in Block DB? lookup the block again. // concurrent mutation in Block DB? lookup the block again.
byte[] bdata = db.getStore().get( byte[] bdata = db.getStore().get(
Longs.toByteArray(block.getBlockID().getLocalID())); Longs.toByteArray(block.getBlockID().getLocalID()));
if (bdata == null) { if (bdata != null) {
LOG.trace("concurrency with delete, ignoring deleted block"); throw new IOException("Missing chunk file "
break; // skip to next block from kvIter + chunkFile.getAbsolutePath());
} else { }
String errorStr = "Missing chunk file " } else if (chunk.getChecksumData().getType()
+ chunkFile.getAbsolutePath(); != ContainerProtos.ChecksumType.NONE){
throw new IOException(errorStr); int length = chunk.getChecksumData().getChecksumsList().size();
ChecksumData cData = new ChecksumData(
chunk.getChecksumData().getType(),
chunk.getChecksumData().getBytesPerChecksum(),
chunk.getChecksumData().getChecksumsList());
long bytesRead = 0;
byte[] buffer = new byte[cData.getBytesPerChecksum()];
try (InputStream fs = new FileInputStream(chunkFile)) {
int i = 0, v = 0;
for (; i < length; i++) {
v = fs.read(buffer);
if (v == -1) {
break;
}
bytesRead += v;
throttler.throttle(v, canceler);
Checksum cal = new Checksum(cData.getChecksumType(),
cData.getBytesPerChecksum());
ByteString expected = cData.getChecksums().get(i);
ByteString actual = cal.computeChecksum(buffer)
.getChecksums().get(0);
if (!Arrays.equals(expected.toByteArray(),
actual.toByteArray())) {
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s len=%d expected" +
" checksum %s actual checksum %s for block %s",
chunk.getChunkName(), chunk.getLen(),
Arrays.toString(expected.toByteArray()),
Arrays.toString(actual.toByteArray()),
block.getBlockID()));
}
}
if (v == -1 && i < length) {
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s expected length=%d"
+ " actual length=%d for block %s",
chunk.getChunkName(),
chunk.getLen(), bytesRead, block.getBlockID()));
}
} }
} }
} }

View File

@ -65,12 +65,12 @@ public final class ChunkManagerFactory {
if (!persist) { if (!persist) {
boolean scrubber = config.getBoolean( boolean scrubber = config.getBoolean(
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED, "hdds.containerscrub.enabled",
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT); false);
if (scrubber) { if (scrubber) {
// Data Scrubber needs to be disabled for non-persistent chunks. // Data Scrubber needs to be disabled for non-persistent chunks.
LOG.warn("Failed to set " + HDDS_CONTAINER_PERSISTDATA + " to false." LOG.warn("Failed to set " + HDDS_CONTAINER_PERSISTDATA + " to false."
+ " Please set " + HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED + " Please set hdds.containerscrub.enabled"
+ " also to false to enable non-persistent containers."); + " also to false to enable non-persistent containers.");
persist = true; persist = true;
} }

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* VolumeScanner scans a single volume. Each VolumeScanner has its own thread.
* <p>They are all managed by the DataNode's BlockScanner.
*/
public class ContainerDataScanner extends Thread {
public static final Logger LOG =
LoggerFactory.getLogger(ContainerDataScanner.class);
/**
* The volume that we're scanning.
*/
private final HddsVolume volume;
private final ContainerController controller;
private final DataTransferThrottler throttler;
private final Canceler canceler;
/**
* True if the thread is stopping.<p/>
* Protected by this object's lock.
*/
private volatile boolean stopping = false;
public ContainerDataScanner(ContainerController controller,
HddsVolume volume, long bytesPerSec) {
this.controller = controller;
this.volume = volume;
this.throttler = new DataTransferThrottler(bytesPerSec);
this.canceler = new Canceler();
setName("ContainerDataScanner(" + volume + ")");
setDaemon(true);
}
@Override
public void run() {
LOG.trace("{}: thread starting.", this);
try {
while (!stopping) {
Iterator<Container> itr = controller.getContainers(volume);
while (!stopping && itr.hasNext()) {
Container c = itr.next();
try {
if (c.shouldScanData()) {
if(!c.scanData(throttler, canceler)) {
controller.markContainerUnhealthy(
c.getContainerData().getContainerID());
}
}
} catch (IOException ex) {
long containerId = c.getContainerData().getContainerID();
LOG.warn("Unexpected exception while scanning container "
+ containerId, ex);
}
}
}
LOG.info("{} exiting.", this);
} catch (Throwable e) {
LOG.error("{} exiting because of exception ", this, e);
}
}
public synchronized void shutdown() {
this.stopping = true;
this.canceler.cancel("ContainerDataScanner("+volume+") is shutting down");
this.interrupt();
try {
this.join();
} catch (InterruptedException ex) {
LOG.warn("Unexpected exception while stopping data scanner for volume "
+ volume, ex);
}
}
@Override
public String toString() {
return "ContainerDataScanner(" + volume +
", " + volume.getStorageID() + ")";
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
* This class is responsible to perform metadata verification of the
* containers.
*/
public class ContainerMetadataScanner extends Thread {
public static final Logger LOG =
LoggerFactory.getLogger(ContainerMetadataScanner.class);
private final ContainerController controller;
private final long metadataScanInterval;
/**
* True if the thread is stopping.<p/>
* Protected by this object's lock.
*/
private boolean stopping = false;
public ContainerMetadataScanner(ContainerController controller,
long metadataScanInterval) {
this.controller = controller;
this.metadataScanInterval = metadataScanInterval;
setName("ContainerMetadataScanner");
setDaemon(true);
}
@Override
public void run() {
/**
* the outer daemon loop exits on down()
*/
LOG.info("Background ContainerMetadataScanner starting up");
while (!stopping) {
long start = System.nanoTime();
scrub();
long interval = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()-start);
// ensure to delay next metadata scan with respect to user config.
if (!stopping && interval < metadataScanInterval) {
try {
Thread.sleep(metadataScanInterval - interval);
} catch (InterruptedException e) {
LOG.info("Background ContainerMetadataScanner interrupted." +
" Going to exit");
}
}
}
}
private void scrub() {
Iterator<Container> containerIt = controller.getContainers();
long count = 0;
while (!stopping && containerIt.hasNext()) {
Container container = containerIt.next();
try {
scrub(container);
} catch (IOException e) {
LOG.info("Unexpected error while scrubbing container {}",
container.getContainerData().getContainerID());
}
count++;
}
LOG.debug("iterator ran integrity checks on {} containers", count);
}
@VisibleForTesting
public void scrub(Container container) throws IOException {
if (!container.scanMetaData()) {
controller.markContainerUnhealthy(
container.getContainerData().getContainerID());
}
}
public synchronized void shutdown() {
this.stopping = true;
this.interrupt();
try {
this.join();
} catch (InterruptedException ex) {
LOG.warn("Unexpected exception while stopping metadata scanner.", ex);
}
}
}

View File

@ -1,158 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
/**
* Background Metadata scrubbing for Ozone Containers.
* Future scope : data(chunks) checksum verification.
*/
public class ContainerScrubber implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerScrubber.class);
private final OzoneConfiguration config;
private final long timePerContainer = 10000; // 10 sec in millis
private boolean halt;
private Thread scrubThread;
private ContainerController controller;
public ContainerScrubber(OzoneConfiguration conf,
ContainerController controller) {
this.config = conf;
this.halt = false;
this.scrubThread = null;
this.controller = controller;
}
@Override public void run() {
/**
* the outer daemon loop exits on down()
*/
LOG.info("Background ContainerScrubber starting up");
while (true) {
scrub();
if (this.halt) {
break; // stop and exit if requested
}
try {
Thread.sleep(300000); /* 5 min between scans */
} catch (InterruptedException e) {
LOG.info("Background ContainerScrubber interrupted. Going to exit");
}
}
}
/**
* Start the scrub scanner thread.
*/
public void up() {
this.halt = false;
if (this.scrubThread == null) {
this.scrubThread = new Thread(this);
scrubThread.start();
} else {
LOG.info("Scrubber up called multiple times. Scrub thread already up.");
}
}
/**
* Stop the scrub scanner thread. Wait for thread to exit
*/
public void down() {
this.halt = true;
if (scrubThread == null) {
LOG.info("Scrubber down invoked, but scrub thread is not running");
return;
}
this.scrubThread.interrupt();
try {
this.scrubThread.join();
} catch (Exception e) {
LOG.warn("Exception when waiting for Container Scrubber thread ", e);
} finally {
this.scrubThread = null;
}
}
/**
* Current implementation : fixed rate scrub, no feedback loop.
* Dynamic throttling based on system load monitoring to be
* implemented later as jira [XXX]
*
* @param startTime
*/
private void throttleScrubber(TimeStamp startTime) {
TimeStamp endTime = new TimeStamp(System.currentTimeMillis());
long timeTaken = endTime.getTime() - startTime.getTime();
if (timeTaken < timePerContainer) {
try {
Thread.sleep(timePerContainer - timeTaken);
} catch (InterruptedException e) {
LOG.debug("Ignoring interrupted sleep inside throttle");
}
}
}
private void scrub() {
Iterator<Container> containerIt = controller.getContainers();
long count = 0;
while (containerIt.hasNext() && !halt) {
TimeStamp startTime = new TimeStamp(System.currentTimeMillis());
Container container = containerIt.next();
try {
scrub(container);
} catch (IOException e) {
LOG.info("Unexpected error while scrubbing container {}",
container.getContainerData().getContainerID());
}
count++;
throttleScrubber(startTime);
}
LOG.debug("iterator ran integrity checks on {} containers", count);
}
@VisibleForTesting
public void scrub(Container container) throws IOException {
if (!container.check()) {
controller.markContainerUnhealthy(
container.getContainerData().getContainerID());
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
/**
* This class defines configuration parameters for container scrubber.
**/
@ConfigGroup(prefix = "hdds.containerscrub")
public class ContainerScrubberConfiguration {
private boolean enabled;
private long metadataScanInterval;
private long bandwidthPerVolume;
@Config(key = "enabled",
type = ConfigType.BOOLEAN,
defaultValue = "false",
tags = {ConfigTag.STORAGE},
description = "Config parameter to enable container scrubber.")
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public boolean isEnabled() {
return enabled;
}
@Config(key = "metadata.scan.interval",
type = ConfigType.TIME,
defaultValue = "3h",
tags = {ConfigTag.STORAGE},
description = "Config parameter define time interval in milliseconds" +
" between two metadata scans by container scrubber.")
public void setMetadataScanInterval(long metadataScanInterval) {
this.metadataScanInterval = metadataScanInterval;
}
public long getMetadataScanInterval() {
return metadataScanInterval;
}
@Config(key = "volume.bytes.per.second",
type = ConfigType.LONG,
defaultValue = "1048576",
tags = {ConfigTag.STORAGE},
description = "Config parameter to throttle I/O bandwidth used"
+ " by scrubber per volume.")
public void setBandwidthPerVolume(long bandwidthPerVolume) {
this.bandwidthPerVolume = bandwidthPerVolume;
}
public long getBandwidthPerVolume() {
return bandwidthPerVolume;
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto import org.apache.hadoop.hdds.protocol.datanode.proto
@ -53,6 +52,7 @@ import org.slf4j.LoggerFactory;
import java.io.*; import java.io.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -75,7 +75,8 @@ public class OzoneContainer {
private final XceiverServerSpi writeChannel; private final XceiverServerSpi writeChannel;
private final XceiverServerSpi readChannel; private final XceiverServerSpi readChannel;
private final ContainerController controller; private final ContainerController controller;
private ContainerScrubber scrubber; private ContainerMetadataScanner metadataScanner;
private List<ContainerDataScanner> dataScanners;
private final BlockDeletingService blockDeletingService; private final BlockDeletingService blockDeletingService;
/** /**
@ -92,7 +93,7 @@ public class OzoneContainer {
this.config = conf; this.config = conf;
this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
this.containerSet = new ContainerSet(); this.containerSet = new ContainerSet();
this.scrubber = null; this.metadataScanner = null;
buildContainerSet(); buildContainerSet();
final ContainerMetrics metrics = ContainerMetrics.create(conf); final ContainerMetrics metrics = ContainerMetrics.create(conf);
@ -167,18 +168,28 @@ public class OzoneContainer {
* Start background daemon thread for performing container integrity checks. * Start background daemon thread for performing container integrity checks.
*/ */
private void startContainerScrub() { private void startContainerScrub() {
boolean enabled = config.getBoolean( ContainerScrubberConfiguration c = config.getObject(
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED, ContainerScrubberConfiguration.class);
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT); boolean enabled = c.isEnabled();
long metadataScanInterval = c.getMetadataScanInterval();
long bytesPerSec = c.getBandwidthPerVolume();
if (!enabled) { if (!enabled) {
LOG.info("Background container scrubber has been disabled by {}", LOG.info("Background container scanner has been disabled.");
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED);
} else { } else {
if (this.scrubber == null) { if (this.metadataScanner == null) {
this.scrubber = new ContainerScrubber(config, controller); this.metadataScanner = new ContainerMetadataScanner(controller,
metadataScanInterval);
}
this.metadataScanner.start();
dataScanners = new ArrayList<>();
for (HddsVolume v : volumeSet.getVolumesList()) {
ContainerDataScanner s = new ContainerDataScanner(controller,
v, bytesPerSec);
s.start();
dataScanners.add(s);
} }
scrubber.up();
} }
} }
@ -186,10 +197,14 @@ public class OzoneContainer {
* Stop the scanner thread and wait for thread to die. * Stop the scanner thread and wait for thread to die.
*/ */
private void stopContainerScrub() { private void stopContainerScrub() {
if (scrubber == null) { if (metadataScanner == null) {
return; return;
} }
scrubber.down(); metadataScanner.shutdown();
metadataScanner = null;
for (ContainerDataScanner s : dataScanners) {
s.shutdown();
}
} }
/** /**

View File

@ -19,21 +19,26 @@
package org.apache.hadoop.ozone.container.keyvalue; package org.apache.hadoop.ozone.container.keyvalue;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl; import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubberConfiguration;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.junit.After; import org.junit.After;
@ -42,7 +47,9 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.RandomAccessFile;
import java.util.Arrays; import java.util.Arrays;
import java.util.ArrayList; import java.util.ArrayList;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -55,7 +62,10 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
/** /**
* Basic sanity test for the KeyValueContainerCheck class. * Basic sanity test for the KeyValueContainerCheck class.
@ -66,7 +76,7 @@ import static org.junit.Assert.assertTrue;
private KeyValueContainerData containerData; private KeyValueContainerData containerData;
private ChunkManagerImpl chunkManager; private ChunkManagerImpl chunkManager;
private VolumeSet volumeSet; private VolumeSet volumeSet;
private Configuration conf; private OzoneConfiguration conf;
private File testRoot; private File testRoot;
public TestKeyValueContainerCheck(String metadataImpl) { public TestKeyValueContainerCheck(String metadataImpl) {
@ -95,12 +105,15 @@ import static org.junit.Assert.assertTrue;
* Sanity test, when there are no corruptions induced. * Sanity test, when there are no corruptions induced.
* @throws Exception * @throws Exception
*/ */
@Test public void testKeyValueContainerCheckNoCorruption() throws Exception { @Test
public void testKeyValueContainerCheckNoCorruption() throws Exception {
long containerID = 101; long containerID = 101;
int deletedBlocks = 1; int deletedBlocks = 1;
int normalBlocks = 3; int normalBlocks = 3;
int chunksPerBlock = 4; int chunksPerBlock = 4;
boolean valid = false; boolean valid = false;
ContainerScrubberConfiguration c = conf.getObject(
ContainerScrubberConfiguration.class);
// test Closed Container // test Closed Container
createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536, createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536,
@ -120,10 +133,70 @@ import static org.junit.Assert.assertTrue;
container.close(); container.close();
// next run checks on a Closed Container // next run checks on a Closed Container
valid = kvCheck.fullCheck(); valid = kvCheck.fullCheck(new DataTransferThrottler(
c.getBandwidthPerVolume()), null);
assertTrue(valid); assertTrue(valid);
} }
/**
* Sanity test, when there are corruptions induced.
* @throws Exception
*/
@Test
public void testKeyValueContainerCheckCorruption() throws Exception {
long containerID = 102;
int deletedBlocks = 1;
int normalBlocks = 3;
int chunksPerBlock = 4;
boolean valid = false;
ContainerScrubberConfiguration sc = conf.getObject(
ContainerScrubberConfiguration.class);
// test Closed Container
createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536,
chunksPerBlock);
File chunksPath = new File(containerData.getChunksPath());
assertTrue(chunksPath.listFiles().length
== (deletedBlocks + normalBlocks) * chunksPerBlock);
container.close();
KeyValueContainerCheck kvCheck =
new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
containerID);
File metaDir = new File(containerData.getMetadataPath());
File dbFile = KeyValueContainerLocationUtil
.getContainerDBFile(metaDir, containerID);
containerData.setDbFile(dbFile);
try(ReferenceCountedDB db =
BlockUtils.getDB(containerData, conf);
KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
new File(containerData.getContainerPath()))) {
BlockData block = kvIter.nextBlock();
assertTrue(!block.getChunks().isEmpty());
ContainerProtos.ChunkInfo c = block.getChunks().get(0);
File chunkFile = ChunkUtils.getChunkFile(containerData,
ChunkInfo.getFromProtoBuf(c));
long length = chunkFile.length();
assertTrue(length > 0);
// forcefully truncate the file to induce failure.
try (RandomAccessFile file = new RandomAccessFile(chunkFile, "rws")) {
file.setLength(length / 2);
}
assertEquals(length/2, chunkFile.length());
}
// metadata check should pass.
valid = kvCheck.fastCheck();
assertTrue(valid);
// checksum validation should fail.
valid = kvCheck.fullCheck(new DataTransferThrottler(
sc.getBandwidthPerVolume()), null);
assertFalse(valid);
}
/** /**
* Creates a container with normal and deleted blocks. * Creates a container with normal and deleted blocks.
* First it will insert normal blocks, and then it will insert * First it will insert normal blocks, and then it will insert
@ -134,12 +207,15 @@ import static org.junit.Assert.assertTrue;
* @throws Exception * @throws Exception
*/ */
private void createContainerWithBlocks(long containerId, int normalBlocks, private void createContainerWithBlocks(long containerId, int normalBlocks,
int deletedBlocks, long chunkLen, int chunksPerBlock) throws Exception { int deletedBlocks, int chunkLen, int chunksPerBlock) throws Exception {
long chunkCount; long chunkCount;
String strBlock = "block"; String strBlock = "block";
String strChunk = "-chunkFile"; String strChunk = "-chunkFile";
byte[] chunkData = new byte[(int) chunkLen];
long totalBlks = normalBlocks + deletedBlocks; long totalBlks = normalBlocks + deletedBlocks;
Checksum checksum = new Checksum(ContainerProtos.ChecksumType.SHA256,
chunkLen);
byte[] chunkData = generateRandomData(chunkLen);
ChecksumData checksumData = checksum.computeChecksum(chunkData);
containerData = new KeyValueContainerData(containerId, containerData = new KeyValueContainerData(containerId,
(long) StorageUnit.BYTES.toBytes( (long) StorageUnit.BYTES.toBytes(
@ -166,8 +242,8 @@ import static org.junit.Assert.assertTrue;
chunkList.clear(); chunkList.clear();
for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) { for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
String chunkName = strBlock + i + strChunk + chunkCount; String chunkName = strBlock + i + strChunk + chunkCount;
long offset = chunkCount * chunkLen; ChunkInfo info = new ChunkInfo(chunkName, 0, chunkLen);
ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen); info.setChecksumData(checksumData);
chunkList.add(info.getProtoBufMessage()); chunkList.add(info.getProtoBufMessage());
chunkManager chunkManager
.writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData), .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
@ -195,4 +271,13 @@ import static org.junit.Assert.assertTrue;
} }
} }
} }
private static byte[] generateRandomData(int length) {
assertTrue(length % 2 == 0);
ByteArrayOutputStream os = new ByteArrayOutputStream(length);
for (int i = 0; i < length; i++) {
os.write(i % 10);
}
return os.toByteArray();
}
} }

View File

@ -38,11 +38,13 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
new Class[] {OzoneConfigKeys.class, ScmConfigKeys.class, new Class[] {OzoneConfigKeys.class, ScmConfigKeys.class,
OMConfigKeys.class, HddsConfigKeys.class, OMConfigKeys.class, HddsConfigKeys.class,
ReconServerConfigKeys.class, ReconServerConfigKeys.class,
S3GatewayConfigKeys.class}; S3GatewayConfigKeys.class
};
errorIfMissingConfigProps = true; errorIfMissingConfigProps = true;
errorIfMissingXmlProps = true; errorIfMissingXmlProps = true;
xmlPropsToSkipCompare.add("hadoop.tags.custom"); xmlPropsToSkipCompare.add("hadoop.tags.custom");
xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID"); xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID");
xmlPropsToSkipCompare.add("hdds.containerscrub.enabled");
addPropertiesNotInXml(); addPropertiesNotInXml();
} }

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubber; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerMetadataScanner;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@ -159,8 +159,8 @@ public class TestDataScrubber {
deleteDirectory(chunksDir); deleteDirectory(chunksDir);
Assert.assertFalse(chunksDir.exists()); Assert.assertFalse(chunksDir.exists());
ContainerScrubber sb = new ContainerScrubber(ozoneConfig, ContainerMetadataScanner sb = new ContainerMetadataScanner(
oc.getController()); oc.getController(), 0);
sb.scrub(c); sb.scrub(c);
// wait for the incremental container report to propagate to SCM // wait for the incremental container report to propagate to SCM

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.freon;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubberConfiguration;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -44,7 +45,9 @@ public class TestDataValidateWithDummyContainers
@BeforeClass @BeforeClass
public static void init() throws Exception { public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setBoolean(HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED, false); ContainerScrubberConfiguration sc =
conf.getObject(ContainerScrubberConfiguration.class);
sc.setEnabled(false);
conf.setBoolean(HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA, false); conf.setBoolean(HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA, false);
conf.setBoolean(OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, conf.setBoolean(OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
false); false);