HADOOP-16858. S3Guard fsck: Add option to remove orphaned entries (#1851). Contributed by Gabor Bota.
Adding a new feature to S3GuardTool's fsck: -fix. Change-Id: I2cdb6601fea1d859b54370046b827ef06eb1107d
This commit is contained in:
parent
8d6373483e
commit
c91ff8c18f
|
@ -159,7 +159,9 @@ public class S3GuardFsck {
|
|||
// Create a handler and handle each violated pairs
|
||||
S3GuardFsckViolationHandler handler =
|
||||
new S3GuardFsckViolationHandler(rawFS, metadataStore);
|
||||
comparePairs.forEach(handler::handle);
|
||||
for (ComparePair comparePair : comparePairs) {
|
||||
handler.logError(comparePair);
|
||||
}
|
||||
|
||||
LOG.info("Total scan time: {}s", stopwatch.now(TimeUnit.SECONDS));
|
||||
LOG.info("Scanned entries: {}", scannedItems);
|
||||
|
@ -344,6 +346,31 @@ public class S3GuardFsck {
|
|||
return rawFS.makeQualified(new Path(s));
|
||||
}
|
||||
|
||||
/**
|
||||
* Fix violations found during check.
|
||||
*
|
||||
* Currently only supports handling the following violation:
|
||||
* - Violation.ORPHAN_DDB_ENTRY
|
||||
*
|
||||
* @param violations to be handled
|
||||
* @throws IOException throws the error if there's any during handling
|
||||
*/
|
||||
public void fixViolations(List<ComparePair> violations) throws IOException {
|
||||
S3GuardFsckViolationHandler handler =
|
||||
new S3GuardFsckViolationHandler(rawFS, metadataStore);
|
||||
|
||||
for (ComparePair v : violations) {
|
||||
if (v.getViolations().contains(Violation.ORPHAN_DDB_ENTRY)) {
|
||||
try {
|
||||
handler.doFix(v);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error during handling the violation: ", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A compare pair with the pair of metadata and the list of violations.
|
||||
*/
|
||||
|
@ -542,7 +569,9 @@ public class S3GuardFsck {
|
|||
// Create a handler and handle each violated pairs
|
||||
S3GuardFsckViolationHandler handler =
|
||||
new S3GuardFsckViolationHandler(rawFS, metadataStore);
|
||||
comparePairs.forEach(handler::handle);
|
||||
for (ComparePair comparePair : comparePairs) {
|
||||
handler.logError(comparePair);
|
||||
}
|
||||
|
||||
stopwatch.stop();
|
||||
LOG.info("Total scan time: {}s", stopwatch.now(TimeUnit.SECONDS));
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a.s3guard;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
@ -26,6 +27,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
|
||||
|
@ -43,6 +45,10 @@ public class S3GuardFsckViolationHandler {
|
|||
|
||||
private static String newLine = System.getProperty("line.separator");
|
||||
|
||||
public enum HandleMode {
|
||||
FIX, LOG
|
||||
}
|
||||
|
||||
public S3GuardFsckViolationHandler(S3AFileSystem fs,
|
||||
DynamoDBMetadataStore ddbms) {
|
||||
|
||||
|
@ -50,7 +56,7 @@ public class S3GuardFsckViolationHandler {
|
|||
this.rawFs = fs;
|
||||
}
|
||||
|
||||
public void handle(S3GuardFsck.ComparePair comparePair) {
|
||||
public void logError(S3GuardFsck.ComparePair comparePair) throws IOException {
|
||||
if (!comparePair.containsViolation()) {
|
||||
LOG.debug("There is no violation in the compare pair: {}", comparePair);
|
||||
return;
|
||||
|
@ -60,11 +66,26 @@ public class S3GuardFsckViolationHandler {
|
|||
sB.append(newLine)
|
||||
.append("On path: ").append(comparePair.getPath()).append(newLine);
|
||||
|
||||
handleComparePair(comparePair, sB);
|
||||
handleComparePair(comparePair, sB, HandleMode.LOG);
|
||||
|
||||
LOG.error(sB.toString());
|
||||
}
|
||||
|
||||
public void doFix(S3GuardFsck.ComparePair comparePair) throws IOException {
|
||||
if (!comparePair.containsViolation()) {
|
||||
LOG.debug("There is no violation in the compare pair: {}", comparePair);
|
||||
return;
|
||||
}
|
||||
|
||||
StringBuilder sB = new StringBuilder();
|
||||
sB.append(newLine)
|
||||
.append("On path: ").append(comparePair.getPath()).append(newLine);
|
||||
|
||||
handleComparePair(comparePair, sB, HandleMode.FIX);
|
||||
|
||||
LOG.info(sB.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance of the violation handler for all the violations
|
||||
* found in the compare pair and use it.
|
||||
|
@ -72,16 +93,28 @@ public class S3GuardFsckViolationHandler {
|
|||
* @param comparePair the compare pair with violations
|
||||
* @param sB StringBuilder to append error strings from violations.
|
||||
*/
|
||||
protected static void handleComparePair(S3GuardFsck.ComparePair comparePair,
|
||||
StringBuilder sB) {
|
||||
protected void handleComparePair(S3GuardFsck.ComparePair comparePair,
|
||||
StringBuilder sB, HandleMode handleMode) throws IOException {
|
||||
|
||||
for (S3GuardFsck.Violation violation : comparePair.getViolations()) {
|
||||
try {
|
||||
ViolationHandler handler = violation.getHandler()
|
||||
.getDeclaredConstructor(S3GuardFsck.ComparePair.class)
|
||||
.newInstance(comparePair);
|
||||
|
||||
switch (handleMode) {
|
||||
case FIX:
|
||||
final String errorStr = handler.getError();
|
||||
sB.append(errorStr);
|
||||
break;
|
||||
case LOG:
|
||||
final String fixStr = handler.fixViolation(rawFs, metadataStore);
|
||||
sB.append(fixStr);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unknown handleMode: " + handleMode);
|
||||
}
|
||||
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.error("Can not find declared constructor for handler: {}",
|
||||
violation.getHandler());
|
||||
|
@ -137,6 +170,12 @@ public class S3GuardFsckViolationHandler {
|
|||
public DirListingMetadata getMsDirListing() {
|
||||
return msDirListing;
|
||||
}
|
||||
|
||||
public String fixViolation(S3AFileSystem fs,
|
||||
DynamoDBMetadataStore ddbms) throws IOException {
|
||||
return String.format("Fixing of violation: %s is not supported yet.",
|
||||
this.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -357,6 +396,16 @@ public class S3GuardFsckViolationHandler {
|
|||
public String getError() {
|
||||
return "The DDB entry is orphan - there is no parent in the MS.";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String fixViolation(S3AFileSystem fs, DynamoDBMetadataStore ddbms)
|
||||
throws IOException {
|
||||
final Path path = getPathMetadata().getFileStatus().getPath();
|
||||
ddbms.forgetMetadata(path);
|
||||
return String.format(
|
||||
"Fixing violation by removing metadata entry from the " +
|
||||
"MS on path: %s", path);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.PrintStream;
|
|||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
|
@ -1609,6 +1610,7 @@ public abstract class S3GuardTool extends Configured implements Tool,
|
|||
static class Fsck extends S3GuardTool {
|
||||
public static final String CHECK_FLAG = "check";
|
||||
public static final String DDB_MS_CONSISTENCY_FLAG = "internal";
|
||||
public static final String FIX_FLAG = "fix";
|
||||
|
||||
public static final String NAME = "fsck";
|
||||
public static final String PURPOSE = "Compares S3 with MetadataStore, and "
|
||||
|
@ -1618,12 +1620,17 @@ public abstract class S3GuardTool extends Configured implements Tool,
|
|||
"\t" + PURPOSE + "\n\n" +
|
||||
"Common options:\n" +
|
||||
" -" + CHECK_FLAG + " Check the metadata store for errors, but do "
|
||||
+ "not fix any issues.\n"+
|
||||
+ "not fix any issues.\n" +
|
||||
" -" + DDB_MS_CONSISTENCY_FLAG + " Check the dynamodb metadata store "
|
||||
+ "for internal consistency.\n";
|
||||
+ "for internal consistency.\n" +
|
||||
" -" + FIX_FLAG + " Fix the errors found in the metadata store. Can " +
|
||||
"be used with " + CHECK_FLAG + " or " + DDB_MS_CONSISTENCY_FLAG + " flags. "
|
||||
+ "\n\t\tFixes: \n" +
|
||||
"\t\t\t- Remove orphan entries from DDB." +
|
||||
"\n";
|
||||
|
||||
Fsck(Configuration conf) {
|
||||
super(conf, CHECK_FLAG, DDB_MS_CONSISTENCY_FLAG);
|
||||
super(conf, CHECK_FLAG, DDB_MS_CONSISTENCY_FLAG, FIX_FLAG);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1648,17 +1655,19 @@ public abstract class S3GuardTool extends Configured implements Tool,
|
|||
final CommandFormat commandFormat = getCommandFormat();
|
||||
|
||||
// check if there's more than one arguments
|
||||
int flags = 0;
|
||||
if (commandFormat.getOpt(CHECK_FLAG)) {
|
||||
flags++;
|
||||
}
|
||||
if (commandFormat.getOpt(DDB_MS_CONSISTENCY_FLAG)) {
|
||||
flags++;
|
||||
}
|
||||
// from CHECK and INTERNAL CONSISTENCY
|
||||
int flags = countTrue(commandFormat.getOpt(CHECK_FLAG),
|
||||
commandFormat.getOpt(DDB_MS_CONSISTENCY_FLAG));
|
||||
if (flags > 1) {
|
||||
out.println(USAGE);
|
||||
throw invalidArgs("There should be only one parameter used for checking.");
|
||||
}
|
||||
if (flags == 0 && commandFormat.getOpt(FIX_FLAG)) {
|
||||
errorln(FIX_FLAG + " flag can be used with either " + CHECK_FLAG + " or " +
|
||||
DDB_MS_CONSISTENCY_FLAG + " flag, but not alone.");
|
||||
errorln(USAGE);
|
||||
return ERROR;
|
||||
}
|
||||
|
||||
String s3Path = paths.get(0);
|
||||
try {
|
||||
|
@ -1707,6 +1716,11 @@ public abstract class S3GuardTool extends Configured implements Tool,
|
|||
return ERROR;
|
||||
}
|
||||
|
||||
if (commandFormat.getOpt(FIX_FLAG)) {
|
||||
S3GuardFsck s3GuardFsck = new S3GuardFsck(fs, ms);
|
||||
s3GuardFsck.fixViolations(violations);
|
||||
}
|
||||
|
||||
out.flush();
|
||||
|
||||
// We fail if there were compare pairs, as the returned compare pairs
|
||||
|
@ -1716,6 +1730,10 @@ public abstract class S3GuardTool extends Configured implements Tool,
|
|||
}
|
||||
return exitValue;
|
||||
}
|
||||
|
||||
int countTrue(Boolean... bools) {
|
||||
return (int) Arrays.stream(bools).filter(p -> p).count();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Audits a DynamoDB S3Guard repository for all the entries being
|
||||
|
|
|
@ -1165,7 +1165,7 @@ Compares S3 with MetadataStore, and returns a failure status if any
|
|||
rules or invariants are violated. Only works with DynamoDB metadata stores.
|
||||
|
||||
```bash
|
||||
hadoop s3guard fsck [-check | -internal] (s3a://BUCKET | s3a://PATH_PREFIX)
|
||||
hadoop s3guard fsck [-check | -internal] [-fix] (s3a://BUCKET | s3a://PATH_PREFIX)
|
||||
```
|
||||
|
||||
`-check` operation checks the metadata store from the S3 perspective, but
|
||||
|
@ -1175,6 +1175,12 @@ The consistency issues will be logged in ERROR loglevel.
|
|||
`-internal` operation checks the internal consistency of the metadata store,
|
||||
but does not fix any issues.
|
||||
|
||||
`-fix` operation fixes consistency issues between the metadatastore and the S3
|
||||
bucket. This parameter is optional, and can be used together with check or
|
||||
internal parameters, but not alone.
|
||||
The following fix is implemented:
|
||||
- Remove orphan entries from DDB
|
||||
|
||||
The errors found will be logged at the ERROR log level.
|
||||
|
||||
*Note*: `-check` and `-internal` operations can be used only as separate
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.net.URI;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Before;
|
||||
|
@ -499,6 +500,19 @@ public class ITestS3GuardFsck extends AbstractS3ATestBase {
|
|||
assertComparePairsSize(comparePairs, 1);
|
||||
checkForViolationInPairs(file, comparePairs,
|
||||
S3GuardFsck.Violation.ORPHAN_DDB_ENTRY);
|
||||
|
||||
// fix the violation
|
||||
s3GuardFsck.fixViolations(
|
||||
comparePairs.stream().filter(cP -> cP.getViolations()
|
||||
.contains(S3GuardFsck.Violation.ORPHAN_DDB_ENTRY))
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
|
||||
// assert that the violation is fixed
|
||||
final List<S3GuardFsck.ComparePair> fixedComparePairs =
|
||||
s3GuardFsck.checkDdbInternalConsistency(cwd);
|
||||
checkNoViolationInPairs(file, fixedComparePairs,
|
||||
S3GuardFsck.Violation.ORPHAN_DDB_ENTRY);
|
||||
} finally {
|
||||
cleanup(file, cwd);
|
||||
}
|
||||
|
@ -596,14 +610,27 @@ public class ITestS3GuardFsck extends AbstractS3ATestBase {
|
|||
.contains(violation);
|
||||
}
|
||||
|
||||
private void checkNoViolationInPairs(Path file2,
|
||||
/**
|
||||
* Check that there is no violation in the pair provided.
|
||||
*
|
||||
* @param file the path to filter to in the comparePairs list.
|
||||
* @param comparePairs the list to validate.
|
||||
* @param violation the violation that should not be in the list.
|
||||
*/
|
||||
private void checkNoViolationInPairs(Path file,
|
||||
List<S3GuardFsck.ComparePair> comparePairs,
|
||||
S3GuardFsck.Violation violation) {
|
||||
final S3GuardFsck.ComparePair file2Pair = comparePairs.stream()
|
||||
.filter(p -> p.getPath().equals(file2))
|
||||
|
||||
if (comparePairs.size() == 0) {
|
||||
LOG.info("Compare pairs is empty, so there's no violation. (As expected.)");
|
||||
return;
|
||||
}
|
||||
|
||||
final S3GuardFsck.ComparePair comparePair = comparePairs.stream()
|
||||
.filter(p -> p.getPath().equals(file))
|
||||
.findFirst().get();
|
||||
assertNotNull("The pair should not be null.", file2Pair);
|
||||
Assertions.assertThat(file2Pair.getViolations())
|
||||
assertNotNull("The pair should not be null.", comparePair);
|
||||
Assertions.assertThat(comparePair.getViolations())
|
||||
.describedAs("Violations in the pair")
|
||||
.doesNotContain(violation);
|
||||
}
|
||||
|
|
|
@ -354,6 +354,40 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
|
|||
"s3a://" + getFileSystem().getBucket()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCLIFsckDDbFixOnlyFails() throws Exception {
|
||||
describe("This test serves the purpose to run fsck with the correct " +
|
||||
"parameters, so there will be no exception thrown.");
|
||||
final int result = run(S3GuardTool.Fsck.NAME,
|
||||
"-" + Fsck.FIX_FLAG,
|
||||
"s3a://" + getFileSystem().getBucket());
|
||||
LOG.info("The return value of the run: {}", result);
|
||||
assertEquals(ERROR, result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the fix flag is accepted by the fsck.
|
||||
*
|
||||
* Note that we don't have an assert at the end of this test because
|
||||
* there maybe some errors found during the check and the returned value
|
||||
* will be ERROR and not SUCCESS. So if we assert on SUCCESS, then the test
|
||||
* could (likely) to be flaky.
|
||||
* If the FIX_FLAG parameter is not accepted here an exception will be thrown
|
||||
* so the test will break.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testCLIFsckDDbFixAndInternalSucceed() throws Exception {
|
||||
describe("This test serves the purpose to run fsck with the correct " +
|
||||
"parameters, so there will be no exception thrown.");
|
||||
final int result = run(S3GuardTool.Fsck.NAME,
|
||||
"-" + Fsck.FIX_FLAG,
|
||||
"-" + Fsck.DDB_MS_CONSISTENCY_FLAG,
|
||||
"s3a://" + getFileSystem().getBucket());
|
||||
LOG.info("The return value of the run: {}", result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that when init, the CMK option can not live without SSE enabled.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue