Merge branch '6088-Create-Scan-Benchmark' into 6088-Time-Ordering-On-Scans-V2

This commit is contained in:
Justin Borromeo 2019-02-06 13:26:17 -08:00
commit 148939e88b
7 changed files with 78 additions and 16 deletions

View File

@ -104,7 +104,7 @@ public class ScanBenchmark
@Param({"2"})
private int numProcessingThreads;
@Param({"750000"})
@Param({"200000"})
private int rowsPerSegment;
@Param({"basic.A"})

View File

@ -536,6 +536,7 @@ This deep storage doesn't do anything. There are no configs.
#### S3 Deep Storage
This deep storage is used to interface with Amazon's S3. Note that the `druid-s3-extensions` extension must be loaded.
The below table shows some important configurations for S3. See [S3 Deep Storage](../development/extensions-core/s3.html) for full configurations.
|Property|Description|Default|
|--------|-----------|-------|
@ -543,7 +544,7 @@ This deep storage is used to interface with Amazon's S3. Note that the `druid-s3
|`druid.s3.secretKey`|The secret key to use to access S3.|none|
|`druid.storage.bucket`|S3 bucket name.|none|
|`druid.storage.baseKey`|S3 object key prefix for storage.|none|
|`druid.storage.disableAcl`|Boolean flag for ACL.|false|
|`druid.storage.disableAcl`|Boolean flag for ACL. If this is set to `false`, the full control would be granted to the bucket owner. This may require to set additional permissions. See [S3 permissions settings](../development/extensions-core/s3.html#s3-permissions-settings).|false|
|`druid.storage.archiveBucket`|S3 bucket name for archiving when running the *archive task*.|none|
|`druid.storage.archiveBaseKey`|S3 object key prefix for archiving.|none|
|`druid.storage.useS3aSchema`|If true, use the "s3a" filesystem when using Hadoop-based ingestion. If false, the "s3n" filesystem will be used. Only affects Hadoop-based ingestion.|false|

View File

@ -45,6 +45,7 @@ As an example, to set the region to 'us-east-1' through system properties:
|`druid.s3.secretKey`|S3 secret key.|Must be set.|
|`druid.storage.bucket`|Bucket to store in.|Must be set.|
|`druid.storage.baseKey`|Base key prefix to use, i.e. what directory.|Must be set.|
|`druid.storage.disableAcl`|Boolean flag to disable ACL. If this is set to `false`, the full control would be granted to the bucket owner. This may require to set additional permissions. See [S3 permissions settings](#s3-permissions-settings).|false|
|`druid.storage.sse.type`|Server-side encryption type. Should be one of `s3`, `kms`, and `custom`. See the below [Server-side encryption section](#server-side-encryption) for more details.|None|
|`druid.storage.sse.kms.keyId`|AWS KMS key ID. Can be empty if `druid.storage.sse.type` is `kms`.|None|
|`druid.storage.sse.custom.base64EncodedKey`|Base64-encoded key. Should be specified if `druid.storage.sse.type` is `custom`.|None|
@ -59,6 +60,11 @@ As an example, to set the region to 'us-east-1' through system properties:
|`druid.s3.proxy.username`|User name to use when connecting through a proxy.|None|
|`druid.s3.proxy.password`|Password to use when connecting through a proxy.|None|
### S3 permissions settings
`s3:GetObject` and `s3:PutObject` are basically required for pushing/loading segments to/from S3.
If `druid.storage.disableAcl` is set to `false`, then `s3:GetBucketAcl` and `s3:PutObjectAcl` are additionally required to set ACL for objects.
## Server-side encryption
You can enable [server-side encryption](https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html) by setting

View File

@ -174,7 +174,10 @@ public class S3DataSegmentMover implements DataSegmentMover
.withPrefix(s3Path)
.withMaxKeys(1)
);
if (listResult.getKeyCount() == 0) {
// Using getObjectSummaries().size() instead of getKeyCount as, in some cases
// it is observed that even though the getObjectSummaries returns some data
// keyCount is still zero.
if (listResult.getObjectSummaries().size() == 0) {
// should never happen
throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path);
}

View File

@ -251,7 +251,10 @@ public class S3Utils
.withMaxKeys(1);
final ListObjectsV2Result result = s3Client.listObjectsV2(request);
if (result.getKeyCount() == 0) {
// Using getObjectSummaries().size() instead of getKeyCount as, in some cases
// it is observed that even though the getObjectSummaries returns some data
// keyCount is still zero.
if (result.getObjectSummaries().size() == 0) {
throw new ISE("Cannot find object for bucket[%s] and key[%s]", bucket, key);
}
final S3ObjectSummary objectSummary = result.getObjectSummaries().get(0);

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.schema;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
@ -402,7 +403,8 @@ public class DruidSchema extends AbstractSchema
}
}
private void removeSegment(final DataSegment segment)
@VisibleForTesting
protected void removeSegment(final DataSegment segment)
{
synchronized (lock) {
log.debug("Segment[%s] is gone.", segment.getId());
@ -450,7 +452,8 @@ public class DruidSchema extends AbstractSchema
* Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed,
* which may be a subset of the asked-for set.
*/
private Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException
@VisibleForTesting
protected Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException
{
final Set<DataSegment> retVal = new HashSet<>();
@ -506,15 +509,26 @@ public class DruidSchema extends AbstractSchema
log.debug("Segment[%s] has signature[%s].", segment.getId(), rowSignature);
final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments =
segmentMetadataInfo.get(segment.getDataSource());
SegmentMetadataHolder holder = dataSourceSegments.get(segment);
SegmentMetadataHolder updatedHolder = SegmentMetadataHolder
.from(holder)
.withRowSignature(rowSignature)
.withNumRows(analysis.getNumRows())
.build();
dataSourceSegments.put(segment, updatedHolder);
setSegmentSignature(segment, updatedHolder);
retVal.add(segment);
if (dataSourceSegments == null) {
log.warn("No segment map found with datasource[%s], skipping refresh", segment.getDataSource());
} else {
SegmentMetadataHolder holder = dataSourceSegments.get(segment);
if (holder == null) {
log.warn(
"No segment[%s] found, skipping refresh",
segment.getId()
);
} else {
SegmentMetadataHolder updatedHolder = SegmentMetadataHolder
.from(holder)
.withRowSignature(rowSignature)
.withNumRows(analysis.getNumRows())
.build();
dataSourceSegments.put(segment, updatedHolder);
setSegmentSignature(segment, updatedHolder);
retVal.add(segment);
}
}
}
}
@ -628,7 +642,7 @@ public class DruidSchema extends AbstractSchema
return rowSignatureBuilder.build();
}
public Map<DataSegment, SegmentMetadataHolder> getSegmentMetadata()
Map<DataSegment, SegmentMetadataHolder> getSegmentMetadata()
{
final Map<DataSegment, SegmentMetadataHolder> segmentMetadata = new HashMap<>();
synchronized (lock) {

View File

@ -63,6 +63,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class DruidSchemaTest extends CalciteTestBase
{
@ -237,4 +238,38 @@ public class DruidSchemaTest extends CalciteTestBase
Assert.assertEquals("m1", fields.get(2).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName());
}
@Test
public void testNullDatasource() throws IOException
{
Map<DataSegment, SegmentMetadataHolder> segmentMetadatas = schema.getSegmentMetadata();
Set<DataSegment> segments = segmentMetadatas.keySet();
Assert.assertEquals(segments.size(), 3);
// segments contains two segments with datasource "foo" and one with datasource "foo2"
// let's remove the only segment with datasource "foo2"
final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo2")).findFirst().orElse(null);
Assert.assertFalse(segmentToRemove == null);
schema.removeSegment(segmentToRemove);
schema.refreshSegments(segments); // can cause NPE without dataSourceSegments null check in DruidSchema#refreshSegmentsForDataSource
segmentMetadatas = schema.getSegmentMetadata();
segments = segmentMetadatas.keySet();
Assert.assertEquals(segments.size(), 2);
}
@Test
public void testNullSegmentMetadataHolder() throws IOException
{
Map<DataSegment, SegmentMetadataHolder> segmentMetadatas = schema.getSegmentMetadata();
Set<DataSegment> segments = segmentMetadatas.keySet();
Assert.assertEquals(segments.size(), 3);
//remove one of the segments with datasource "foo"
final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo")).findFirst().orElse(null);
Assert.assertFalse(segmentToRemove == null);
schema.removeSegment(segmentToRemove);
schema.refreshSegments(segments); // can cause NPE without holder null check in SegmentMetadataHolder#from
segmentMetadatas = schema.getSegmentMetadata();
segments = segmentMetadatas.keySet();
Assert.assertEquals(segments.size(), 2);
}
}