diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java index 2f5b007fe63..bb492f2d9f7 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java @@ -104,7 +104,7 @@ public class ScanBenchmark @Param({"2"}) private int numProcessingThreads; - @Param({"750000"}) + @Param({"200000"}) private int rowsPerSegment; @Param({"basic.A"}) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index c63eb41994e..639bb45cce2 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -536,6 +536,7 @@ This deep storage doesn't do anything. There are no configs. #### S3 Deep Storage This deep storage is used to interface with Amazon's S3. Note that the `druid-s3-extensions` extension must be loaded. +The below table shows some important configurations for S3. See [S3 Deep Storage](../development/extensions-core/s3.html) for full configurations. |Property|Description|Default| |--------|-----------|-------| @@ -543,7 +544,7 @@ This deep storage is used to interface with Amazon's S3. Note that the `druid-s3 |`druid.s3.secretKey`|The secret key to use to access S3.|none| |`druid.storage.bucket`|S3 bucket name.|none| |`druid.storage.baseKey`|S3 object key prefix for storage.|none| -|`druid.storage.disableAcl`|Boolean flag for ACL.|false| +|`druid.storage.disableAcl`|Boolean flag for ACL. If this is set to `false`, the full control would be granted to the bucket owner. This may require to set additional permissions. See [S3 permissions settings](../development/extensions-core/s3.html#s3-permissions-settings).|false| |`druid.storage.archiveBucket`|S3 bucket name for archiving when running the *archive task*.|none| |`druid.storage.archiveBaseKey`|S3 object key prefix for archiving.|none| |`druid.storage.useS3aSchema`|If true, use the "s3a" filesystem when using Hadoop-based ingestion. If false, the "s3n" filesystem will be used. Only affects Hadoop-based ingestion.|false| diff --git a/docs/content/development/extensions-core/s3.md b/docs/content/development/extensions-core/s3.md index cead6cdc687..53e5df92cd1 100644 --- a/docs/content/development/extensions-core/s3.md +++ b/docs/content/development/extensions-core/s3.md @@ -45,6 +45,7 @@ As an example, to set the region to 'us-east-1' through system properties: |`druid.s3.secretKey`|S3 secret key.|Must be set.| |`druid.storage.bucket`|Bucket to store in.|Must be set.| |`druid.storage.baseKey`|Base key prefix to use, i.e. what directory.|Must be set.| +|`druid.storage.disableAcl`|Boolean flag to disable ACL. If this is set to `false`, the full control would be granted to the bucket owner. This may require to set additional permissions. See [S3 permissions settings](#s3-permissions-settings).|false| |`druid.storage.sse.type`|Server-side encryption type. Should be one of `s3`, `kms`, and `custom`. See the below [Server-side encryption section](#server-side-encryption) for more details.|None| |`druid.storage.sse.kms.keyId`|AWS KMS key ID. Can be empty if `druid.storage.sse.type` is `kms`.|None| |`druid.storage.sse.custom.base64EncodedKey`|Base64-encoded key. Should be specified if `druid.storage.sse.type` is `custom`.|None| @@ -59,6 +60,11 @@ As an example, to set the region to 'us-east-1' through system properties: |`druid.s3.proxy.username`|User name to use when connecting through a proxy.|None| |`druid.s3.proxy.password`|Password to use when connecting through a proxy.|None| +### S3 permissions settings + +`s3:GetObject` and `s3:PutObject` are basically required for pushing/loading segments to/from S3. +If `druid.storage.disableAcl` is set to `false`, then `s3:GetBucketAcl` and `s3:PutObjectAcl` are additionally required to set ACL for objects. + ## Server-side encryption You can enable [server-side encryption](https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html) by setting diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java index 4fd54b49956..84a76d15e41 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java @@ -174,7 +174,10 @@ public class S3DataSegmentMover implements DataSegmentMover .withPrefix(s3Path) .withMaxKeys(1) ); - if (listResult.getKeyCount() == 0) { + // Using getObjectSummaries().size() instead of getKeyCount as, in some cases + // it is observed that even though the getObjectSummaries returns some data + // keyCount is still zero. + if (listResult.getObjectSummaries().size() == 0) { // should never happen throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index e0a3dac8cae..97858864b14 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -251,7 +251,10 @@ public class S3Utils .withMaxKeys(1); final ListObjectsV2Result result = s3Client.listObjectsV2(request); - if (result.getKeyCount() == 0) { + // Using getObjectSummaries().size() instead of getKeyCount as, in some cases + // it is observed that even though the getObjectSummaries returns some data + // keyCount is still zero. + if (result.getObjectSummaries().size() == 0) { throw new ISE("Cannot find object for bucket[%s] and key[%s]", bucket, key); } final S3ObjectSummary objectSummary = result.getObjectSummaries().get(0); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 3efbd8d0074..2929229b4c5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; @@ -402,7 +403,8 @@ public class DruidSchema extends AbstractSchema } } - private void removeSegment(final DataSegment segment) + @VisibleForTesting + protected void removeSegment(final DataSegment segment) { synchronized (lock) { log.debug("Segment[%s] is gone.", segment.getId()); @@ -450,7 +452,8 @@ public class DruidSchema extends AbstractSchema * Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed, * which may be a subset of the asked-for set. */ - private Set refreshSegments(final Set segments) throws IOException + @VisibleForTesting + protected Set refreshSegments(final Set segments) throws IOException { final Set retVal = new HashSet<>(); @@ -506,15 +509,26 @@ public class DruidSchema extends AbstractSchema log.debug("Segment[%s] has signature[%s].", segment.getId(), rowSignature); final Map dataSourceSegments = segmentMetadataInfo.get(segment.getDataSource()); - SegmentMetadataHolder holder = dataSourceSegments.get(segment); - SegmentMetadataHolder updatedHolder = SegmentMetadataHolder - .from(holder) - .withRowSignature(rowSignature) - .withNumRows(analysis.getNumRows()) - .build(); - dataSourceSegments.put(segment, updatedHolder); - setSegmentSignature(segment, updatedHolder); - retVal.add(segment); + if (dataSourceSegments == null) { + log.warn("No segment map found with datasource[%s], skipping refresh", segment.getDataSource()); + } else { + SegmentMetadataHolder holder = dataSourceSegments.get(segment); + if (holder == null) { + log.warn( + "No segment[%s] found, skipping refresh", + segment.getId() + ); + } else { + SegmentMetadataHolder updatedHolder = SegmentMetadataHolder + .from(holder) + .withRowSignature(rowSignature) + .withNumRows(analysis.getNumRows()) + .build(); + dataSourceSegments.put(segment, updatedHolder); + setSegmentSignature(segment, updatedHolder); + retVal.add(segment); + } + } } } @@ -628,7 +642,7 @@ public class DruidSchema extends AbstractSchema return rowSignatureBuilder.build(); } - public Map getSegmentMetadata() + Map getSegmentMetadata() { final Map segmentMetadata = new HashMap<>(); synchronized (lock) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 1b4e00819bb..30b99e59ab4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -63,6 +63,7 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; public class DruidSchemaTest extends CalciteTestBase { @@ -237,4 +238,38 @@ public class DruidSchemaTest extends CalciteTestBase Assert.assertEquals("m1", fields.get(2).getName()); Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); } + + @Test + public void testNullDatasource() throws IOException + { + Map segmentMetadatas = schema.getSegmentMetadata(); + Set segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 3); + // segments contains two segments with datasource "foo" and one with datasource "foo2" + // let's remove the only segment with datasource "foo2" + final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo2")).findFirst().orElse(null); + Assert.assertFalse(segmentToRemove == null); + schema.removeSegment(segmentToRemove); + schema.refreshSegments(segments); // can cause NPE without dataSourceSegments null check in DruidSchema#refreshSegmentsForDataSource + segmentMetadatas = schema.getSegmentMetadata(); + segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 2); + } + + @Test + public void testNullSegmentMetadataHolder() throws IOException + { + Map segmentMetadatas = schema.getSegmentMetadata(); + Set segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 3); + //remove one of the segments with datasource "foo" + final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo")).findFirst().orElse(null); + Assert.assertFalse(segmentToRemove == null); + schema.removeSegment(segmentToRemove); + schema.refreshSegments(segments); // can cause NPE without holder null check in SegmentMetadataHolder#from + segmentMetadatas = schema.getSegmentMetadata(); + segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 2); + } + }