Fix durable storage cleanup (#13853)

This commit is contained in:
Rohan Garg 2023-03-06 09:49:14 +05:30 committed by GitHub
parent a580aca551
commit f33898ed6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 20 deletions

View File

@ -822,25 +822,24 @@ public class WorkerImpl implements Worker
continue; continue;
} }
output.close(); output.close();
}
// One caveat with this approach is that in case of a worker crash, while the MM/Indexer systems will delete their // One caveat with this approach is that in case of a worker crash, while the MM/Indexer systems will delete their
// temp directories where intermediate results were stored, it won't be the case for the external storage. // temp directories where intermediate results were stored, it won't be the case for the external storage.
// Therefore, the logic for cleaning the stage output in case of a worker/machine crash has to be external. // Therefore, the logic for cleaning the stage output in case of a worker/machine crash has to be external.
// We currently take care of this in the controller. // We currently take care of this in the controller.
if (durableStageStorageEnabled && removeDurableStorageFiles) { if (durableStageStorageEnabled && removeDurableStorageFiles) {
final String folderName = DurableStorageUtils.getTaskIdOutputsFolderName( final String folderName = DurableStorageUtils.getTaskIdOutputsFolderName(
task.getControllerTaskId(), task.getControllerTaskId(),
stageId.getStageNumber(), stageId.getStageNumber(),
task.getWorkerNumber(), task.getWorkerNumber(),
task.getId() task.getId()
); );
try { try {
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(folderName); MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(folderName);
} }
catch (Exception e) { catch (Exception e) {
// If an error is thrown while cleaning up a file, log it and try to continue with the cleanup // If an error is thrown while cleaning up a file, log it and try to continue with the cleanup
log.warn(e, "Error while cleaning up folder at path " + folderName); log.warn(e, "Error while cleaning up folder at path " + folderName);
}
} }
} }
} }

View File

@ -121,6 +121,7 @@ public class S3StorageConnector implements StorageConnector
} else { } else {
readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength(); readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength();
} }
AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
// build a sequence input stream from chunks // build a sequence input stream from chunks
return new SequenceInputStream(new Enumeration<InputStream>() return new SequenceInputStream(new Enumeration<InputStream>()
@ -128,6 +129,12 @@ public class S3StorageConnector implements StorageConnector
@Override @Override
public boolean hasMoreElements() public boolean hasMoreElements()
{ {
// checking if the stream was already closed. If it was, then don't iterate over the remaining chunks
// SequenceInputStream's close method closes all the chunk streams in its close. Since we're opening them
// lazily, we don't need to close them.
if (isSequenceStreamClosed.get()) {
return false;
}
// don't stop until the whole object is downloaded // don't stop until the whole object is downloaded
return currReadStart.get() < readEnd; return currReadStart.get() < readEnd;
} }
@ -212,7 +219,15 @@ public class S3StorageConnector implements StorageConnector
throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile)); throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile));
} }
} }
}); })
{
@Override
public void close() throws IOException
{
isSequenceStreamClosed.set(true);
super.close();
}
};
} }
@Override @Override