From e6b54f7f683a29f4d457f1bd1caf8ba2620b36f5 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 24 May 2023 19:21:40 +0100 Subject: [PATCH] Revert "HADOOP-18706. Improve S3ABlockOutputStream recovery (#5563)" This reverts commit 372631c5667b02b5c9f280ab4c09a3d71a7ee36d. Reverted due to HADOOP-18744. --- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 13 +----- .../apache/hadoop/fs/s3a/S3ADataBlocks.java | 31 +++++-------- .../hadoop/fs/s3a/WriteOperationHelper.java | 1 - .../apache/hadoop/fs/s3a/WriteOperations.java | 7 --- .../fs/s3a/ITestS3ABlockOutputArray.java | 45 +------------------ .../apache/hadoop/fs/s3a/TestDataBlocks.java | 2 +- .../fs/s3a/TestS3ABlockOutputStream.java | 11 ----- 7 files changed, 14 insertions(+), 96 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 2febc87aec3..43a2b7e0dbd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -232,9 +232,7 @@ class S3ABlockOutputStream extends OutputStream implements LOG.error("Number of partitions in stream exceeds limit for S3: " + Constants.MAX_MULTIPART_COUNT + " write may fail."); } - activeBlock = blockFactory.create( - writeOperationHelper.getAuditSpan().getSpanId(), - key, blockCount, this.blockSize, statistics); + activeBlock = blockFactory.create(blockCount, this.blockSize, statistics); } return activeBlock; } @@ -730,14 +728,8 @@ class S3ABlockOutputStream extends OutputStream implements /** * Shared processing of Syncable operation reporting/downgrade. - * - * Syncable API is not supported, so calls to hsync/hflush will throw an - * UnsupportedOperationException unless the stream was constructed with - * {@link #downgradeSyncableExceptions} set to true, in which case the stream is flushed. - * @throws IOException IO Problem - * @throws UnsupportedOperationException if downgrade syncable exceptions is set to false */ - private void handleSyncableInvocation() throws IOException { + private void handleSyncableInvocation() { final UnsupportedOperationException ex = new UnsupportedOperationException(E_NOT_SYNCABLE); if (!downgradeSyncableExceptions) { @@ -749,7 +741,6 @@ class S3ABlockOutputStream extends OutputStream implements key); // and log at debug LOG.debug("Downgrading Syncable call", ex); - flush(); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index 2299892b35b..b20d8e859aa 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -175,14 +175,12 @@ final class S3ADataBlocks { /** * Create a block. * - * @param spanId id of the audit span - * @param key key of s3 object being written to * @param index index of block * @param limit limit of the block. * @param statistics stats to work with * @return a new block. */ - abstract DataBlock create(String spanId, String key, long index, long limit, + abstract DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException; @@ -393,11 +391,11 @@ final class S3ADataBlocks { } @Override - DataBlock create(String spanId, String key, long index, long limit, + DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { Preconditions.checkArgument(limit > 0, - "Invalid block size: %d [%s]", limit, key); + "Invalid block size: %d", limit); return new ByteArrayBlock(0, limit, statistics); } @@ -518,11 +516,11 @@ final class S3ADataBlocks { } @Override - ByteBufferBlock create(String spanId, String key, long index, long limit, + ByteBufferBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { Preconditions.checkArgument(limit > 0, - "Invalid block size: %d [%s]", limit, key); + "Invalid block size: %d", limit); return new ByteBufferBlock(index, limit, statistics); } @@ -800,8 +798,6 @@ final class S3ADataBlocks { * Buffer blocks to disk. */ static class DiskBlockFactory extends BlockFactory { - private static final String ESCAPED_FORWARD_SLASH = "EFS"; - private static final String ESCAPED_BACKSLASH = "EBS"; DiskBlockFactory(S3AFileSystem owner) { super(owner); @@ -810,8 +806,6 @@ final class S3ADataBlocks { /** * Create a temp file and a {@link DiskBlock} instance to manage it. * - * @param spanId id of the audit span - * @param key of the s3 object being written * @param index block index * @param limit limit of the block. -1 means "no limit" * @param statistics statistics to update @@ -819,22 +813,17 @@ final class S3ADataBlocks { * @throws IOException IO problems */ @Override - DataBlock create(String spanId, String key, long index, + DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { Preconditions.checkArgument(limit != 0, - "Invalid block size: %d [%s]", limit, key); - String prefix = String.format("s3ablock-%04d-%s-%s-", index, spanId, escapeS3Key(key)); - File destFile = getOwner().createTmpFileForWrite(prefix, limit, getOwner().getConf()); + "Invalid block size: %d", limit); + File destFile = getOwner() + .createTmpFileForWrite(String.format("s3ablock-%04d-", index), + limit, getOwner().getConf()); return new DiskBlock(destFile, limit, index, statistics); } - - protected static String escapeS3Key(String key) { - return key - .replace("\\", ESCAPED_BACKSLASH) - .replace("/", ESCAPED_FORWARD_SLASH); - } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 3f42d2caf4a..8e15a10944c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -217,7 +217,6 @@ public class WriteOperationHelper implements WriteOperations { * Get the audit span this object was created with. * @return the audit span */ - @Override public AuditSpan getAuditSpan() { return auditSpan; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 6dd833761ec..1c3d3688575 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -43,7 +43,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; -import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.util.functional.CallableRaisingIOE; @@ -306,12 +305,6 @@ public interface WriteOperations extends AuditSpanSource, Closeable { */ Configuration getConf(); - /** - * Get the audit span this object was created with. - * @return the audit span - */ - AuditSpan getAuditSpan(); - /** * Create a S3 Select request for the destination path. * This does not build the query. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java index bfeaa25f16c..53fa0d83b55 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java @@ -29,12 +29,9 @@ import org.apache.hadoop.io.IOUtils; import org.junit.BeforeClass; import org.junit.Test; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.util.Arrays; -import java.util.Objects; import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -82,46 +79,6 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase { verifyUpload("regular", 1024); } - /** - * Test that the DiskBlock's local file doesn't result in error when the S3 key exceeds the max - * char limit of the local file system. Currently - * {@link java.io.File#createTempFile(String, String, File)} is being relied on to handle the - * truncation. - * @throws IOException - */ - @Test - public void testDiskBlockCreate() throws IOException { - String s3Key = // 1024 char - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key__very_long_s3_key__very_long_s3_key__very_long_s3_key__" + - "very_long_s3_key"; - long blockSize = getFileSystem().getDefaultBlockSize(); - try (S3ADataBlocks.BlockFactory diskBlockFactory = - new S3ADataBlocks.DiskBlockFactory(getFileSystem()); - S3ADataBlocks.DataBlock dataBlock = - diskBlockFactory.create("spanId", s3Key, 1, blockSize, null); - ) { - String tmpDir = getConfiguration().get("hadoop.tmp.dir"); - boolean created = Arrays.stream( - Objects.requireNonNull(new File(tmpDir).listFiles())) - .anyMatch(f -> f.getName().contains("very_long_s3_key")); - assertTrue(String.format("tmp file should have been created locally in %s", tmpDir), created); - LOG.info(dataBlock.toString()); // block file name/location can be viewed in failsafe-report - } - } - @Test(expected = IOException.class) public void testWriteAfterStreamClose() throws Throwable { Path dest = path("testWriteAfterStreamClose"); @@ -179,7 +136,7 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase { new S3AInstrumentation(new URI("s3a://example")); BlockOutputStreamStatistics outstats = instrumentation.newOutputStreamStatistics(null); - S3ADataBlocks.DataBlock block = factory.create("spanId", "object/key", 1, BLOCK_SIZE, outstats); + S3ADataBlocks.DataBlock block = factory.create(1, BLOCK_SIZE, outstats); block.write(dataset, 0, dataset.length); S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); InputStream stream = uploadData.getUploadStream(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java index d2ea0218acd..700ef5ced3d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java @@ -51,7 +51,7 @@ public class TestDataBlocks extends Assert { new S3ADataBlocks.ByteBufferBlockFactory(null)) { int limit = 128; S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block - = factory.create("spanId", "s3\\object/key", 1, limit, null); + = factory.create(1, limit, null); assertOutstandingBuffers(factory, 1); byte[] buffer = ContractTestUtils.toAsciiByteArray("test data"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java index beeeb39fc48..ffa2c81e58a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.fs.s3a.test.MinimalWriteOperationHelperCallbacks; import org.apache.hadoop.fs.statistics.IOStatisticsContext; -import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.util.Progressable; import org.junit.Before; import org.junit.Test; @@ -39,10 +38,7 @@ import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -63,9 +59,6 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest { mock(S3ADataBlocks.BlockFactory.class); long blockSize = Constants.DEFAULT_MULTIPART_SIZE; WriteOperationHelper oHelper = mock(WriteOperationHelper.class); - AuditSpan auditSpan = mock(AuditSpan.class); - when(auditSpan.getSpanId()).thenReturn("spanId"); - when(oHelper.getAuditSpan()).thenReturn(auditSpan); PutTracker putTracker = mock(PutTracker.class); final S3ABlockOutputStream.BlockOutputStreamBuilder builder = S3ABlockOutputStream.builder() @@ -163,7 +156,6 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest { stream = spy(new S3ABlockOutputStream(builder)); intercept(UnsupportedOperationException.class, () -> stream.hflush()); intercept(UnsupportedOperationException.class, () -> stream.hsync()); - verify(stream, never()).flush(); } /** @@ -177,11 +169,8 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest { builder.withDowngradeSyncableExceptions(true); stream = spy(new S3ABlockOutputStream(builder)); - verify(stream, never()).flush(); stream.hflush(); - verify(stream, times(1)).flush(); stream.hsync(); - verify(stream, times(2)).flush(); } }