Remove DataSegmentFinder, InsertSegmentToDb, and descriptor.json file in deep storage (#6911)

* Remove DataSegmentFinder, InsertSegmentToDb, and descriptor.json file

* delete descriptor.file when killing segments

* fix test

* Add doc for ha

* improve warning
This commit is contained in:
Jihoon Son 2019-02-20 15:10:29 -08:00 committed by GitHub
parent dd34691004
commit 4e2b085201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 266 additions and 2678 deletions

View File

@ -24,7 +24,6 @@ import com.google.inject.Key;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentFinder;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
@ -56,11 +55,6 @@ public class Binders
return PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class));
}
public static MapBinder<String, DataSegmentFinder> dataSegmentFinderBinder(Binder binder)
{
return PolyBind.optionBinder(binder, Key.get(DataSegmentFinder.class));
}
public static MapBinder<String, TaskLogs> taskLogsBinder(Binder binder)
{
return PolyBind.optionBinder(binder, Key.get(TaskLogs.class));

View File

@ -1,79 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.util.Map;
import java.util.Set;
/**
* A DataSegmentFinder is responsible for finding Druid segments underneath a specified directory and optionally updates
* all descriptor.json files on deep storage with correct loadSpec.
*/
@ExtensionPoint
public interface DataSegmentFinder
{
Logger log = new Logger(DataSegmentFinder.class);
/**
* This method should first recursively look for descriptor.json (partitionNum_descriptor.json for HDFS data storage) underneath
* workingDirPath and then verify that index.zip (partitionNum_index.zip for HDFS data storage) exists in the same folder.
* If not, it should throw SegmentLoadingException to let the caller know that descriptor.json exists
* while index.zip doesn't. If a segment is found and updateDescriptor is set, then this method should update the
* loadSpec in descriptor.json to reflect the location from where it was found. After the search, this method
* should return the set of segments that were found.
*
* @param workingDirPath the String representation of the working directory path
* @param updateDescriptor if true, update loadSpec in descriptor.json if loadSpec's location is different from where
* desciptor.json was found
*
* @return a set of segments that were found underneath workingDirPath
*/
Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException;
/**
* Adds dataSegment if it does not exist in timestampedSegments. If it exists, replaces entry if segmentModifiedAt is
* newer than stored timestamp.
*
* @param timestampedSegments map of <segmentID, Pair<segment, modifiedAt>> containing segments with modified time
* @param dataSegment segment to add
* @param segmentModifiedAt segment modified timestamp
*/
static void putInMapRetainingNewest(
Map<SegmentId, Pair<DataSegment, Long>> timestampedSegments,
DataSegment dataSegment,
long segmentModifiedAt
)
{
timestampedSegments.merge(
dataSegment.getId(),
Pair.of(dataSegment, segmentModifiedAt),
(previous, current) -> {
log.warn("Multiple copies of segmentId [%s] found, using newest version", current.lhs.getId());
return previous.rhs > current.rhs ? previous : current;
}
);
}
}

View File

@ -32,7 +32,10 @@ Derby is the default metadata store for Druid, however, it is not suitable for p
[MySQL](../development/extensions-core/mysql.html) and [PostgreSQL](../development/extensions-core/postgresql.html) are more production suitable metadata stores.
<div class="note caution">
Derby is not suitable for production use as a metadata store. Use MySQL or PostgreSQL instead.
The Metadata Storage stores the entire metadata which is essential for a Druid cluster to work.
For production clusters, consider using MySQL or PostgreSQL instead of Derby.
Also, it's highly recommended to set up a high availability environment
because there is no way to restore if you lose any metadata.
</div>
## Using derby

View File

@ -114,7 +114,7 @@ In this way, you can validate both push (at realtime node) and pull (at Historic
* DataSegmentPusher
Wherever your data storage (cloud storage service, distributed file system, etc.) is, you should be able to see two new files: `descriptor.json` (`partitionNum_descriptor.json` for HDFS data storage) and `index.zip` (`partitionNum_index.zip` for HDFS data storage) after your ingestion task ends.
Wherever your data storage (cloud storage service, distributed file system, etc.) is, you should be able to see one new file: `index.zip` (`partitionNum_index.zip` for HDFS data storage) after your ingestion task ends.
* DataSegmentPuller
@ -147,7 +147,7 @@ To mark a segment as not used, you need to connect to your metadata storage and
To start a segment killing task, you need to access the old Coordinator console `http://<COODRINATOR_IP>:<COORDINATOR_PORT/old-console/kill.html` then select the appropriate datasource and then input a time range (e.g. `2000/3000`).
After the killing task ends, both `descriptor.json` (`partitionNum_descriptor.json` for HDFS data storage) and `index.zip` (`partitionNum_index.zip` for HDFS data storage) files should be deleted from the data storage.
After the killing task ends, `index.zip` (`partitionNum_index.zip` for HDFS data storage) file should be deleted from the data storage.
### Adding a new Firehose

View File

@ -0,0 +1,40 @@
---
layout: doc_page
title: "High Availability"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# High Availability
ZooKeeper, metadata store, the coordinator, the overlord, and brokers are recommended to set up a high availability environment.
- For highly-available ZooKeeper, you will need a cluster of 3 or 5 ZooKeeper nodes.
We recommend either installing ZooKeeper on its own hardware, or running 3 or 5 Master servers (where overlords or coordinators are running)
and configuring ZooKeeper on them appropriately. See the [ZooKeeper admin guide](https://zookeeper.apache.org/doc/current/zookeeperAdmin.html) for more details.
- For highly-available metadata storage, we recommend MySQL or PostgreSQL with replication and failover enabled.
See [MySQL HA/Scalability Guide](https://dev.mysql.com/doc/mysql-ha-scalability/en/)
and [PostgreSQL's High Availability, Load Balancing, and Replication](https://www.postgresql.org/docs/9.5/high-availability.html) for MySQL and PostgreSQL, respectively.
- For highly-available Druid Coordinators and Overlords, we recommend to run multiple servers.
If they are all configured to use the same ZooKeeper cluster and metadata storage,
then they will automatically failover between each other as necessary.
Only one will be active at a time, but inactive servers will redirect to the currently active server.
- Druid Brokers can be scaled out and all running servers will be active and queryable.
We recommend placing them behind a load balancer.

View File

@ -1,156 +0,0 @@
---
layout: doc_page
title: "insert-segment-to-db Tool"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# insert-segment-to-db Tool
`insert-segment-to-db` is a tool that can insert segments into Druid metadata storage. It is intended to be used
to update the segment table in metadata storage after people manually migrate segments from one place to another.
It can also be used to insert missing segments into Druid, or even recover metadata storage by telling it where the
segments are stored.
**Note:** This tool simply scans the deep storage directory to reconstruct the metadata entries used to locate and
identify each segment. It does not have any understanding about whether those segments _should actually_ be written to
the metadata storage. In certain cases, this can lead to undesired or inconsistent results. Some examples of things to
watch out for:
- Dropped datasources will be re-enabled.
- The latest version of each segment set will be loaded by Druid, which in some cases may not be the version you
actually want. An example of this is a bad compaction job that generates segments which need to be manually rolled
back by removing that version from the metadata table. If these segments are not also removed from deep storage,
they will be imported back into the metadata table and overshadow the correct version.
- Some indexers such as the Kafka indexing service have the potential to generate more than one set of segments that
have the same segment ID but different contents. When the metadata is first written, the correct set of segments is
referenced and the other set is normally deleted from deep storage. It is possible however that an unhandled
exception could result in multiple sets of segments with the same segment ID remaining in deep storage. Since this
tool does not know which one is the 'correct' one to use, it will simply select the newest segment set and ignore
the other versions. If the wrong segment set is picked, the exactly-once semantics of the Kafka indexing service
will no longer hold true and you may get duplicated or dropped events.
With these considerations in mind, it is recommended that data migrations be done by exporting the original metadata
storage directly, since that is the definitive cluster state. This tool should be used as a last resort when a direct
export is not possible.
**Note:** This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere
with the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering.
In order to make it work, user will have to provide metadata storage credentials and deep storage type through Java JVM argument
or runtime.properties file. Specifically, this tool needs to know:
```
druid.metadata.storage.type
druid.metadata.storage.connector.connectURI
druid.metadata.storage.connector.user
druid.metadata.storage.connector.password
druid.storage.type
```
Besides the properties above, you also need to specify the location where the segments are stored and whether you want to
update descriptor.json (`partitionNum_descriptor.json` for HDFS data storage). These two can be provided through command line arguments.
`--workingDir` (Required)
The directory URI where segments are stored. This tool will recursively look for segments underneath this directory
and insert/update these segments in metdata storage.
Attention: workingDir must be a complete URI, which means it must be prefixed with scheme type. For example,
hdfs://hostname:port/segment_directory
`--updateDescriptor` (Optional)
if set to true, this tool will update `loadSpec` field in `descriptor.json` (`partitionNum_descriptor.json` for HDFS data storage) if the path in `loadSpec` is different from
where `desciptor.json` (`partitionNum_descriptor.json` for HDFS data storage) was found. Default value is `true`.
Note: you will also need to load different Druid extensions per the metadata and deep storage you use. For example, if you
use `mysql` as metadata storage and HDFS as deep storage, you should load `mysql-metadata-storage` and `druid-hdfs-storage`
extensions.
Example:
Suppose your metadata storage is `mysql` and you've migrated some segments to a directory in HDFS, and that directory looks
like this,
```
Directory path: /druid/storage/wikipedia
├── 2013-08-31T000000.000Z_2013-09-01T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   ├── 0_descriptor.json
│   └── 0_index.zip
├── 2013-09-01T000000.000Z_2013-09-02T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   ├── 0_descriptor.json
│   └── 0_index.zip
├── 2013-09-02T000000.000Z_2013-09-03T000000.000Z
│   └── 2015-10-21T22_07_57.074Z
│   ├── 0_descriptor.json
│   └── 0_index.zip
└── 2013-09-03T000000.000Z_2013-09-04T000000.000Z
└── 2015-10-21T22_07_57.074Z
├── 0_descriptor.json
└── 0_index.zip
```
To load all these segments into `mysql`, you can fire the command below,
```
java
-Ddruid.metadata.storage.type=mysql
-Ddruid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-hdfs-storage\"]
-Ddruid.storage.type=hdfs
-cp $DRUID_CLASSPATH
org.apache.druid.cli.Main tools insert-segment-to-db --workingDir hdfs://host:port//druid/storage/wikipedia --updateDescriptor true
```
In this example, `mysql` and deep storage type are provided through Java JVM arguments, you can optionally put all
of them in a runtime.properites file and include it in the Druid classpath. Note that we also include `mysql-metadata-storage`
and `druid-hdfs-storage` in the extension list.
After running this command, the segments table in `mysql` should store the new location for each segment we just inserted.
Note that for segments stored in HDFS, druid config must contain core-site.xml as described in [Druid Docs](../tutorials/cluster.html), as this new location is stored with relative path.
It is also possible to use `s3` as deep storage. In order to work with it, specify `s3` as deep storage type and load
[`druid-s3-extensions`](../development/extensions-core/s3.html) as an extension.
```
java
-Ddruid.metadata.storage.type=mysql
-Ddruid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.extensions.loadList=[\"mysql-metadata-storage\",\"druid-s3-extensions\"]
-Ddruid.storage.type=s3
-Ddruid.s3.accessKey=...
-Ddruid.s3.secretKey=...
-Ddruid.storage.bucket=your-bucket
-Ddruid.storage.baseKey=druid/storage/wikipedia
-Ddruid.storage.maxListingLength=1000
-cp $DRUID_CLASSPATH
org.apache.druid.cli.Main tools insert-segment-to-db --workingDir "druid/storage/wikipedia" --updateDescriptor true
```
Note that you can provide the location of segments with either `druid.storage.baseKey` or `--workingDir`. If both are
specified, `--workingDir` gets higher priority. `druid.storage.maxListingLength` is to determine the length of a
partial list in requesting a object listing to `s3`, which defaults to 1000.

View File

@ -128,6 +128,7 @@ layout: toc
* [Alerts](/docs/VERSION/operations/alerts.html)
* [Updating the Cluster](/docs/VERSION/operations/rolling-updates.html)
* [Different Hadoop Versions](/docs/VERSION/operations/other-hadoop.html)
* [High Availability](/docs/VERSION/operations/high-availability.html)
* [Performance FAQ](/docs/VERSION/operations/performance-faq.html)
* [Dump Segment Tool](/docs/VERSION/operations/dump-segment.html)
* [Insert Segment Tool](/docs/VERSION/operations/insert-segment-to-db.html)

View File

@ -19,7 +19,6 @@
package org.apache.druid.storage.azure;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -34,7 +33,6 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.format.ISODateTimeFormat;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -47,18 +45,15 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
private static final Logger log = new Logger(AzureDataSegmentPusher.class);
private final AzureStorage azureStorage;
private final AzureAccountConfig config;
private final ObjectMapper jsonMapper;
@Inject
public AzureDataSegmentPusher(
AzureStorage azureStorage,
AzureAccountConfig config,
ObjectMapper jsonMapper
AzureAccountConfig config
)
{
this.azureStorage = azureStorage;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Deprecated
@ -112,25 +107,11 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
return ImmutableList.of("druid.azure");
}
public File createSegmentDescriptorFile(final ObjectMapper jsonMapper, final DataSegment segment) throws
IOException
{
File descriptorFile = File.createTempFile("descriptor", ".json");
try (FileOutputStream stream = new FileOutputStream(descriptorFile)) {
stream.write(jsonMapper.writeValueAsBytes(segment));
}
return descriptorFile;
}
public Map<String, String> getAzurePaths(final DataSegment segment, final boolean useUniquePath)
public String getAzurePath(final DataSegment segment, final boolean useUniquePath)
{
final String storageDir = this.getStorageDir(segment, useUniquePath);
return ImmutableMap.of(
"index", StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME),
"descriptor", StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.DESCRIPTOR_FILE_NAME)
);
return StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME);
}
@ -139,25 +120,20 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final int binaryVersion,
final long size,
final File compressedSegmentData,
final File descriptorFile,
final Map<String, String> azurePaths
final String azurePath
)
throws StorageException, IOException, URISyntaxException
{
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"));
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"));
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePath);
final DataSegment outSegment = segment
.withSize(size)
.withLoadSpec(this.makeLoadSpec(new URI(azurePaths.get("index"))))
.withLoadSpec(this.makeLoadSpec(new URI(azurePath)))
.withBinaryVersion(binaryVersion);
log.info("Deleting file [%s]", compressedSegmentData);
compressedSegmentData.delete();
log.info("Deleting file [%s]", descriptorFile);
descriptorFile.delete();
return outSegment;
}
@ -169,17 +145,15 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final int binaryVersion = SegmentUtils.getVersionFromDir(indexFilesDir);
File zipOutFile = null;
File descriptorFile = null;
try {
final File outFile = zipOutFile = File.createTempFile("index", ".zip");
final long size = CompressionUtils.zip(indexFilesDir, zipOutFile);
final File descFile = descriptorFile = createSegmentDescriptorFile(jsonMapper, segment);
final Map<String, String> azurePaths = getAzurePaths(segment, useUniquePath);
final String azurePath = getAzurePath(segment, useUniquePath);
return AzureUtils.retryAzureOperation(
() -> uploadDataSegment(segment, binaryVersion, size, outFile, descFile, azurePaths),
() -> uploadDataSegment(segment, binaryVersion, size, outFile, azurePath),
config.getMaxTries()
);
}
@ -191,11 +165,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
}
if (descriptorFile != null) {
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
}
}
}

View File

@ -44,7 +44,6 @@ public class AzureStorageDruidModule implements DruidModule
public static final String SCHEME = "azure";
public static final String STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s";
public static final String DESCRIPTOR_FILE_NAME = "descriptor.json";
public static final String INDEX_ZIP_FILE_NAME = "index.zip";
@Override

View File

@ -95,7 +95,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
private void testPushInternal(boolean useUniquePath, String matcher) throws Exception
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
@ -130,32 +130,25 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
public void getAzurePathsTest()
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
final String storageDir = pusher.getStorageDir(dataSegment, false);
Map<String, String> paths = pusher.getAzurePaths(dataSegment, false);
final String azurePath = pusher.getAzurePath(dataSegment, false);
assertEquals(
StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME),
paths.get("index")
);
assertEquals(
StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.DESCRIPTOR_FILE_NAME),
paths.get("descriptor")
azurePath
);
}
@Test
public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
final int binaryVersion = 9;
final File compressedSegmentData = new File("index.zip");
final File descriptorFile = new File("descriptor.json");
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment, false);
final String azurePath = pusher.getAzurePath(dataSegment, false);
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"));
expectLastCall();
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"));
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePath);
expectLastCall();
replayAll();
@ -165,15 +158,14 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
binaryVersion,
0, // empty file
compressedSegmentData,
descriptorFile,
azurePaths
azurePath
);
assertEquals(compressedSegmentData.length(), pushedDataSegment.getSize());
assertEquals(binaryVersion, (int) pushedDataSegment.getBinaryVersion());
Map<String, Object> loadSpec = pushedDataSegment.getLoadSpec();
assertEquals(AzureStorageDruidModule.SCHEME, MapUtils.getString(loadSpec, "type"));
assertEquals(azurePaths.get("index"), MapUtils.getString(loadSpec, "blobPath"));
assertEquals(azurePath, MapUtils.getString(loadSpec, "blobPath"));
verifyAll();
}
@ -181,7 +173,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
@Test
public void getPathForHadoopTest()
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
String hadoopPath = pusher.getPathForHadoop();
Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath);
}
@ -189,7 +181,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
@Test
public void storageDirContainsNoColonsTest()
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig);
DataSegment withColons = dataSegment.withVersion("2018-01-05T14:54:09.295Z");
String segmentPath = pusher.getStorageDir(withColons, false);
Assert.assertFalse("Path should not contain any columns", segmentPath.contains(":"));

View File

@ -1,143 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.google;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.InputStreamContent;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentFinder;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class GoogleDataSegmentFinder implements DataSegmentFinder
{
private static final Logger LOG = new Logger(GoogleDataSegmentFinder.class);
private final GoogleStorage storage;
private final GoogleAccountConfig config;
private final ObjectMapper jsonMapper;
@Inject
public GoogleDataSegmentFinder(
final GoogleStorage storage,
final GoogleAccountConfig config,
final ObjectMapper jsonMapper
)
{
this.storage = storage;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException
{
final Set<DataSegment> segments = new HashSet<>();
try {
Storage.Objects.List listObjects = storage.list(config.getBucket());
listObjects.setPrefix(workingDirPath);
Objects objects;
do {
objects = listObjects.execute();
List<StorageObject> items = objects.getItems();
if (items != null) {
for (StorageObject item : items) {
if ("descriptor.json".equals(GoogleUtils.toFilename(item.getName()))) {
final String descriptorJson = item.getName();
final String indexZip = GoogleUtils.indexZipForSegmentPath(descriptorJson);
if (storage.exists(item.getBucket(), indexZip)) {
InputStream is = storage.get(item.getBucket(), item.getName());
final DataSegment dataSegment = jsonMapper.readValue(is, DataSegment.class);
LOG.info("Found segment [%s] located at [%s]", dataSegment.getId(), indexZip);
Map<String, Object> loadSpec = dataSegment.getLoadSpec();
if (!GoogleStorageDruidModule.SCHEME.equals(loadSpec.get("type")) ||
!indexZip.equals(loadSpec.get("path")) ||
!config.getBucket().equals(loadSpec.get("bucket"))) {
File descriptorFile = null;
try {
loadSpec.put("type", GoogleStorageDruidModule.SCHEME);
loadSpec.put("path", indexZip);
loadSpec.put("bucket", config.getBucket());
if (updateDescriptor) {
LOG.info(
"Updating loadSpec in descriptor.json at [%s] with new path [%s]",
descriptorJson,
indexZip
);
final byte[] bts = jsonMapper.writeValueAsBytes(dataSegment);
InputStreamContent mediaContent = new InputStreamContent("application/json", new ByteArrayInputStream(bts));
mediaContent.setLength(bts.length);
storage.insert(config.getBucket(), descriptorJson, mediaContent);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
if (descriptorFile != null) {
LOG.info("Deleting file [%s]", descriptorFile);
descriptorFile.delete();
}
}
}
segments.add(dataSegment);
} else {
throw new SegmentLoadingException(
"index.zip didn't exist at [%s] while descriptor.json exists!?",
indexZip
);
}
}
}
}
} while (objects.getNextPageToken() != null);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "IO exception");
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class);
Throwables.propagate(e);
}
return segments;
}
}

View File

@ -56,6 +56,8 @@ public class GoogleDataSegmentKiller implements DataSegmentKiller
try {
deleteIfPresent(bucket, indexPath);
// descriptor.json is a file to store segment metadata in deep storage. This file is deprecated and not stored
// anymore, but we still delete them if exists.
deleteIfPresent(bucket, descriptorPath);
}
catch (IOException e) {

View File

@ -19,7 +19,6 @@
package org.apache.druid.storage.google;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.FileContent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
@ -39,7 +38,6 @@ import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
@ -49,18 +47,15 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
private final GoogleStorage storage;
private final GoogleAccountConfig config;
private final ObjectMapper jsonMapper;
@Inject
public GoogleDataSegmentPusher(
final GoogleStorage storage,
final GoogleAccountConfig config,
final ObjectMapper jsonMapper
final GoogleAccountConfig config
)
{
this.storage = storage;
this.config = config;
this.jsonMapper = jsonMapper;
LOG.info("Configured Google as deep storage");
}
@ -84,16 +79,6 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
return ImmutableList.of("druid.google");
}
public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegment segment)
throws IOException
{
File descriptorFile = File.createTempFile("descriptor", ".json");
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(segment));
return descriptorFile;
}
public void insert(final File file, final String contentType, final String path)
throws IOException
{
@ -125,24 +110,19 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
final int version = SegmentUtils.getVersionFromDir(indexFilesDir);
File indexFile = null;
File descriptorFile = null;
try {
indexFile = File.createTempFile("index", ".zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile);
final String storageDir = this.getStorageDir(segment, useUniquePath);
final String indexPath = buildPath(storageDir + "/" + "index.zip");
final String descriptorPath = buildPath(storageDir + "/" + "descriptor.json");
final DataSegment outSegment = segment
.withSize(indexSize)
.withLoadSpec(makeLoadSpec(config.getBucket(), indexPath))
.withBinaryVersion(version);
descriptorFile = createDescriptorFile(jsonMapper, outSegment);
insert(indexFile, "application/zip", indexPath);
insert(descriptorFile, "application/json", descriptorPath);
return outSegment;
}
@ -154,11 +134,6 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
LOG.info("Deleting file [%s]", indexFile);
indexFile.delete();
}
if (descriptorFile != null) {
LOG.info("Deleting file [%s]", descriptorFile);
descriptorFile.delete();
}
}
}

View File

@ -93,8 +93,6 @@ public class GoogleStorageDruidModule implements DruidModule
.in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentKiller.class)
.in(LazySingleton.class);
Binders.dataSegmentFinderBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentFinder.class)
.in(LazySingleton.class);
Binders.taskLogsBinder(binder).addBinding(SCHEME).to(GoogleTaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", GoogleTaskLogsConfig.class);

View File

@ -1,223 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.google;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.api.client.http.InputStreamContent;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
public class GoogleDataSegmentFinderTest extends EasyMockSupport
{
private static final String SEGMENT_1_ZIP = "prefix/test/index.zip";
private static final String SEGMENT_1_DESCRIPTOR = "prefix/test/descriptor.json";
private static final String TEST_BUCKET = "testbucket";
private static final DataSegment SEGMENT_1 = DataSegment
.builder()
.dataSource("wikipedia")
.interval(Intervals.of("2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z"))
.version("2015-10-21T22:07:57.074Z")
.loadSpec(
ImmutableMap.of(
"type",
GoogleStorageDruidModule.SCHEME,
"bucket",
TEST_BUCKET,
"path",
SEGMENT_1_ZIP
)
)
.dimensions(ImmutableList.of("language", "page"))
.metrics(ImmutableList.of("count"))
.build();
private static final ObjectMapper mapper = TestHelper.makeJsonMapper();
@BeforeClass
public static void setUpStatic()
{
mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
}
@Test
public void testFindSegments() throws Exception
{
GoogleStorage storage = createMock(GoogleStorage.class);
GoogleAccountConfig config = new GoogleAccountConfig();
config.setBucket(TEST_BUCKET);
final GoogleDataSegmentFinder googleDataSegmentFinder = new GoogleDataSegmentFinder(storage, config, mapper);
StorageObject item1 = new StorageObject();
item1.setName(SEGMENT_1_DESCRIPTOR);
item1.setBucket(TEST_BUCKET);
Storage.Objects.List segmentList = createMock(Storage.Objects.List.class);
List<StorageObject> items = new ArrayList<StorageObject>();
items.add(item1);
Objects objects = new Objects();
objects.setItems(items);
objects.setNextPageToken(null);
InputStream stream1 = new ByteArrayInputStream(mapper.writeValueAsBytes(SEGMENT_1));
expect(storage.list(TEST_BUCKET)).andReturn(segmentList);
expect(segmentList.setPrefix("prefix")).andReturn(segmentList);
expect(segmentList.execute()).andReturn(objects);
expect(storage.exists(TEST_BUCKET, SEGMENT_1_ZIP)).andReturn(true);
expect(storage.get(TEST_BUCKET, SEGMENT_1_DESCRIPTOR)).andReturn(stream1);
replayAll();
final Set<DataSegment> segments = googleDataSegmentFinder.findSegments("prefix", false);
Assert.assertEquals(1, segments.size());
verifyAll();
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail() throws Exception
{
GoogleStorage storage = createMock(GoogleStorage.class);
GoogleAccountConfig config = new GoogleAccountConfig();
config.setBucket(TEST_BUCKET);
final GoogleDataSegmentFinder googleDataSegmentFinder = new GoogleDataSegmentFinder(storage, config, mapper);
StorageObject item1 = new StorageObject();
item1.setName(SEGMENT_1_DESCRIPTOR);
item1.setBucket(TEST_BUCKET);
Storage.Objects.List segmentList = createMock(Storage.Objects.List.class);
List<StorageObject> items = new ArrayList<StorageObject>();
items.add(item1);
Objects objects = new Objects();
objects.setItems(items);
objects.setNextPageToken(null);
expect(storage.list(TEST_BUCKET)).andReturn(segmentList);
expect(segmentList.setPrefix("")).andReturn(segmentList);
expect(segmentList.execute()).andReturn(objects);
expect(storage.exists(TEST_BUCKET, SEGMENT_1_ZIP)).andReturn(false);
replayAll();
googleDataSegmentFinder.findSegments("", false);
verifyAll();
}
@Test
public void testFindSegmentsUpdateLoadSpec() throws Exception
{
GoogleStorage storage = createMock(GoogleStorage.class);
GoogleAccountConfig config = new GoogleAccountConfig();
config.setBucket(TEST_BUCKET);
final GoogleDataSegmentFinder googleDataSegmentFinder = new GoogleDataSegmentFinder(storage, config, mapper);
StorageObject item1 = new StorageObject();
item1.setName(SEGMENT_1_DESCRIPTOR);
item1.setBucket(TEST_BUCKET);
Storage.Objects.List segmentList = createMock(Storage.Objects.List.class);
List<StorageObject> items = new ArrayList<StorageObject>();
items.add(item1);
Objects objects = new Objects();
objects.setItems(items);
objects.setNextPageToken(null);
final DataSegment segmentMissingLoadSpec = DataSegment
.builder(SEGMENT_1)
.loadSpec(ImmutableMap.of())
.build();
InputStream stream1 = new ByteArrayInputStream(mapper.writeValueAsBytes(segmentMissingLoadSpec));
expect(storage.list(TEST_BUCKET)).andReturn(segmentList);
expect(segmentList.setPrefix("")).andReturn(segmentList);
expect(segmentList.execute()).andReturn(objects);
expect(storage.exists(TEST_BUCKET, SEGMENT_1_ZIP)).andReturn(true);
expect(storage.get(TEST_BUCKET, SEGMENT_1_DESCRIPTOR)).andReturn(stream1);
Capture<InputStreamContent> capture1 = Capture.newInstance();
storage.insert(EasyMock.eq(TEST_BUCKET), EasyMock.eq(SEGMENT_1_DESCRIPTOR), EasyMock.capture(capture1));
expectLastCall();
replayAll();
final Set<DataSegment> segments = googleDataSegmentFinder.findSegments("", true);
Assert.assertEquals(1, segments.size());
// Guaranteed there's only 1 element due to prior assert
DataSegment testSegment = segments.iterator().next();
Map<String, Object> testLoadSpec = testSegment.getLoadSpec();
Assert.assertEquals(GoogleStorageDruidModule.SCHEME, testLoadSpec.get("type"));
Assert.assertEquals(TEST_BUCKET, testLoadSpec.get("bucket"));
Assert.assertEquals(SEGMENT_1_ZIP, testLoadSpec.get("path"));
testSegment = mapper.readValue(capture1.getValue().getInputStream(), DataSegment.class);
testLoadSpec = testSegment.getLoadSpec();
Assert.assertEquals(GoogleStorageDruidModule.SCHEME, testLoadSpec.get("type"));
Assert.assertEquals(TEST_BUCKET, testLoadSpec.get("bucket"));
Assert.assertEquals(SEGMENT_1_ZIP, testLoadSpec.get("path"));
verifyAll();
}
}

View File

@ -19,10 +19,8 @@
package org.apache.druid.storage.google;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
@ -47,11 +45,9 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
private static final String bucket = "bucket";
private static final String prefix = "prefix";
private static final String path = "prefix/test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private GoogleStorage storage;
private GoogleAccountConfig googleAccountConfig;
private ObjectMapper jsonMapper;
@Before
public void before()
@ -60,8 +56,6 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
googleAccountConfig = new GoogleAccountConfig();
googleAccountConfig.setBucket(bucket);
googleAccountConfig.setPrefix(prefix);
jsonMapper = new DefaultObjectMapper();
}
@Test
@ -86,17 +80,13 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
size
);
GoogleDataSegmentPusher pusher = createMockBuilder(
GoogleDataSegmentPusher.class
).withConstructor(
storage,
googleAccountConfig,
jsonMapper
).addMockedMethod("insert", File.class, String.class, String.class).createMock();
GoogleDataSegmentPusher pusher = createMockBuilder(GoogleDataSegmentPusher.class)
.withConstructor(storage, googleAccountConfig)
.addMockedMethod("insert", File.class, String.class, String.class)
.createMock();
final String storageDir = pusher.getStorageDir(segmentToPush, false);
final String indexPath = prefix + "/" + storageDir + "/" + "index.zip";
final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json";
pusher.insert(
EasyMock.anyObject(File.class),
@ -104,12 +94,6 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
EasyMock.eq(indexPath)
);
expectLastCall();
pusher.insert(
EasyMock.anyObject(File.class),
EasyMock.eq("application/json"),
EasyMock.eq(descriptorPath)
);
expectLastCall();
replayAll();
@ -136,11 +120,7 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
StringBuilder sb = new StringBuilder();
sb.setLength(0);
config.setPrefix(sb.toString()); // avoid cached empty string
GoogleDataSegmentPusher pusher = new GoogleDataSegmentPusher(
storage,
config,
jsonMapper
);
GoogleDataSegmentPusher pusher = new GoogleDataSegmentPusher(storage, config);
Assert.assertEquals("/path", pusher.buildPath("/path"));
config.setPrefix(null);
Assert.assertEquals("/path", pusher.buildPath("/path"));

View File

@ -77,9 +77,12 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@ -222,7 +225,7 @@ public class OrcIndexGeneratorJobTest
null,
null,
null,
false,
true,
false,
false,
false,
@ -256,6 +259,13 @@ public class OrcIndexGeneratorJobTest
{
Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config));
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
IndexGeneratorJob
.getPublishedSegments(config)
.forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>())
.add(segment));
final Map<Interval, List<File>> intervalToIndexFiles = new HashMap<>();
int segmentNum = 0;
for (DateTime currTime = interval.getStart(); currTime.isBefore(interval.getEnd()); currTime = currTime.plusDays(1)) {
Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++];
@ -272,19 +282,37 @@ public class OrcIndexGeneratorJobTest
Assert.assertTrue(segmentOutputFolder.exists());
Assert.assertEquals(shardInfo.length, segmentOutputFolder.list().length);
int rowCount = 0;
for (int partitionNum = 0; partitionNum < shardInfo.length; ++partitionNum) {
File individualSegmentFolder = new File(segmentOutputFolder, Integer.toString(partitionNum));
Assert.assertTrue(individualSegmentFolder.exists());
File descriptor = new File(individualSegmentFolder, "descriptor.json");
File indexZip = new File(individualSegmentFolder, "index.zip");
Assert.assertTrue(descriptor.exists());
Assert.assertTrue(indexZip.exists());
DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class);
intervalToIndexFiles.computeIfAbsent(new Interval(currTime, currTime.plusDays(1)), k -> new ArrayList<>())
.add(indexZip);
}
}
Assert.assertEquals(intervalToSegments.size(), intervalToIndexFiles.size());
segmentNum = 0;
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<DataSegment> segments = entry.getValue();
final List<File> indexFiles = intervalToIndexFiles.get(interval);
Collections.sort(segments);
indexFiles.sort(Comparator.comparing(File::getAbsolutePath));
Assert.assertNotNull(indexFiles);
Assert.assertEquals(segments.size(), indexFiles.size());
Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++];
int rowCount = 0;
for (int i = 0; i < segments.size(); i++) {
final DataSegment dataSegment = segments.get(i);
final File indexZip = indexFiles.get(i);
Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
Assert.assertEquals(new Interval(currTime, currTime.plusDays(1)), dataSegment.getInterval());
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
@ -297,7 +325,7 @@ public class OrcIndexGeneratorJobTest
Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
Integer[] hashShardInfo = shardInfo[partitionNum];
Integer[] hashShardInfo = shardInfo[i];
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum());
Assert.assertEquals((int) hashShardInfo[1], spec.getPartitions());

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>

View File

@ -1,148 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentFinder;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
*/
public class HdfsDataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(HdfsDataSegmentFinder.class);
private final Configuration config;
private final ObjectMapper mapper;
@Inject
public HdfsDataSegmentFinder(Configuration config, ObjectMapper mapper)
{
this.config = config;
this.mapper = mapper;
}
@Override
public Set<DataSegment> findSegments(String workingDirPathStr, boolean updateDescriptor)
throws SegmentLoadingException
{
final Map<SegmentId, Pair<DataSegment, Long>> timestampedSegments = new HashMap<>();
final Path workingDirPath = new Path(workingDirPathStr);
FileSystem fs;
try {
fs = workingDirPath.getFileSystem(config);
log.info(fs.getScheme());
log.info("FileSystem URI:" + fs.getUri());
if (!fs.exists(workingDirPath)) {
throw new SegmentLoadingException("Working directory [%s] doesn't exist.", workingDirPath);
}
if (!fs.isDirectory(workingDirPath)) {
throw new SegmentLoadingException("Working directory [%s] is not a directory!?", workingDirPath);
}
final RemoteIterator<LocatedFileStatus> it = fs.listFiles(workingDirPath, true);
while (it.hasNext()) {
final LocatedFileStatus locatedFileStatus = it.next();
final Path path = locatedFileStatus.getPath();
if (path.getName().endsWith("descriptor.json")) {
// There are 3 supported path formats:
// - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/descriptor.json
// - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_descriptor.json
// - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_UUID_descriptor.json
final String descriptorParts[] = path.getName().split("_");
Path indexZip = new Path(path.getParent(), "index.zip");
if (descriptorParts.length > 1) {
Preconditions.checkState(descriptorParts.length <= 3 &&
org.apache.commons.lang.StringUtils.isNumeric(descriptorParts[0]) &&
"descriptor.json".equals(descriptorParts[descriptorParts.length - 1]),
"Unexpected descriptor filename format [%s]", path
);
indexZip = new Path(
path.getParent(),
StringUtils.format(
"%s_%sindex.zip",
descriptorParts[0],
descriptorParts.length == 2 ? "" : descriptorParts[1] + "_"
)
);
}
if (fs.exists(indexZip)) {
final DataSegment dataSegment = mapper.readValue(fs.open(path), DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getId(), indexZip);
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
final String pathWithoutScheme = indexZip.toUri().getPath();
if (!loadSpec.get("type").equals(HdfsStorageDruidModule.SCHEME) || !loadSpec.get("path")
.equals(pathWithoutScheme)) {
loadSpec.put("type", HdfsStorageDruidModule.SCHEME);
loadSpec.put("path", pathWithoutScheme);
if (updateDescriptor) {
log.info("Updating loadSpec in descriptor.json at [%s] with new path [%s]", path, pathWithoutScheme);
mapper.writeValue(fs.create(path, true), dataSegment);
}
}
DataSegmentFinder.putInMapRetainingNewest(
timestampedSegments,
dataSegment,
locatedFileStatus.getModificationTime()
);
} else {
throw new SegmentLoadingException(
"index.zip didn't exist at [%s] while descripter.json exists!?",
indexZip
);
}
}
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", workingDirPath);
}
return timestampedSegments.values().stream().map(x -> x.lhs).collect(Collectors.toSet());
}
}

View File

@ -100,9 +100,9 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
throw new SegmentLoadingException("Unable to kill segment, failed to delete [%s]", segmentPath.toString());
}
if (!fs.delete(descriptorPath, false)) {
throw new SegmentLoadingException("Unable to kill segment, failed to delete [%s]", descriptorPath.toString());
}
// descriptor.json is a file to store segment metadata in deep storage. This file is deprecated and not stored
// anymore, but we still delete them if exists.
fs.delete(descriptorPath, false);
removeEmptyParentDirectories(fs, segmentPath, zipParts.length > 1 ? 2 : 3);
}

View File

@ -24,8 +24,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.UUIDUtils;
import org.apache.druid.java.util.common.CompressionUtils;
@ -44,7 +42,6 @@ import org.joda.time.format.ISODateTimeFormat;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Map;
@ -138,31 +135,13 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
segment.getShardSpec().getPartitionNum(),
uniquePrefix
));
final Path outDescriptorFile = new Path(StringUtils.format(
"%s/%s/%d_%sdescriptor.json",
fullyQualifiedStorageDirectory.get(),
storageDir,
segment.getShardSpec().getPartitionNum(),
uniquePrefix
));
dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir));
final Path tmpDescriptorFile = new Path(
tmpIndexFile.getParent(),
StringUtils.format("%s_descriptor.json", dataSegment.getShardSpec().getPartitionNum())
);
log.info("Creating descriptor file at[%s]", tmpDescriptorFile);
ByteSource
.wrap(jsonMapper.writeValueAsBytes(dataSegment))
.copyTo(new HdfsOutputStreamSupplier(fs, tmpDescriptorFile));
// Create parent if it does not exist, recreation is not an error
fs.mkdirs(outIndexFile.getParent());
copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile);
copyFilesWithChecks(fs, tmpIndexFile, outIndexFile);
}
finally {
@ -194,24 +173,6 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
}
}
private static class HdfsOutputStreamSupplier extends ByteSink
{
private final FileSystem fs;
private final Path descriptorFile;
public HdfsOutputStreamSupplier(FileSystem fs, Path descriptorFile)
{
this.fs = fs;
this.descriptorFile = descriptorFile;
}
@Override
public OutputStream openStream() throws IOException
{
return fs.create(descriptorFile);
}
}
@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{

View File

@ -92,7 +92,6 @@ public class HdfsStorageDruidModule implements DruidModule
Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentPusher.class).in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentFinderBinder(binder).addBinding(SCHEME).to(HdfsDataSegmentFinder.class).in(LazySingleton.class);
final Configuration conf = new Configuration();

View File

@ -1,325 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.storage.hdfs.HdfsDataSegmentFinder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Set;
/**
*/
public class HdfsDataSegmentFinderTest
{
private static final ObjectMapper mapper = TestHelper.makeJsonMapper();
private static final String DESCRIPTOR_JSON = "descriptor.json";
private static final String INDEX_ZIP = "index.zip";
private static final DataSegment SEGMENT_1 = DataSegment
.builder()
.dataSource("wikipedia")
.interval(Intervals.of("2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z"))
.version("2015-10-21T22:07:57.074Z")
.loadSpec(
ImmutableMap.of(
"type",
"hdfs",
"path",
"hdfs://abc.com:1234/somewhere/index.zip"
)
)
.dimensions(ImmutableList.of("language", "page"))
.metrics(ImmutableList.of("count"))
.build();
private static final DataSegment SEGMENT_2 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z"))
.build();
private static final DataSegment SEGMENT_3 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"))
.version("2015-10-22T22:07:57.074Z")
.build();
private static final DataSegment SEGMENT_4_0 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"))
.shardSpec(new NumberedShardSpec(0, 2))
.build();
private static final DataSegment SEGMENT_4_1 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"))
.shardSpec(new NumberedShardSpec(1, 2))
.build();
private static final DataSegment SEGMENT_5 = DataSegment
.builder()
.dataSource("wikipedia")
.interval(Intervals.of("2013-09-03T00:00:00.000Z/2013-09-04T00:00:00.000Z"))
.version("2015-10-21T22:07:57.074Z")
.loadSpec(
ImmutableMap.of(
"type",
"hdfs",
"path",
"hdfs://abc.com:1234/somewhere/1_index.zip"
)
)
.dimensions(ImmutableList.of("language", "page"))
.metrics(ImmutableList.of("count"))
.build();
private static MiniDFSCluster miniCluster;
private static File hdfsTmpDir;
private static URI uriBase;
private static Configuration conf;
private static FileSystem fs;
private Path dataSourceDir;
private Path descriptor1;
private Path descriptor2;
private Path descriptor3;
private Path descriptor4_0;
private Path descriptor4_1;
private Path descriptor5;
private Path indexZip1;
private Path indexZip2;
private Path indexZip3;
private Path indexZip4_0;
private Path indexZip4_1;
private Path indexZip5;
@BeforeClass
public static void setupStatic() throws IOException
{
mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
hdfsTmpDir = File.createTempFile("hdfsDataSource", "dir");
if (!hdfsTmpDir.delete()) {
throw new IOE("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath());
}
conf = new Configuration(true);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath());
miniCluster = new MiniDFSCluster.Builder(conf).build();
uriBase = miniCluster.getURI();
fs = miniCluster.getFileSystem();
}
@AfterClass
public static void tearDownStatic() throws IOException
{
if (miniCluster != null) {
miniCluster.shutdown(true);
}
FileUtils.deleteDirectory(hdfsTmpDir);
}
@Before
public void setUp() throws IOException
{
dataSourceDir = new Path(new Path(uriBase), "/usr/dataSource");
descriptor1 = new Path(dataSourceDir, "interval1/v1/0/" + DESCRIPTOR_JSON);
descriptor2 = new Path(dataSourceDir, "interval2/v1/0/" + DESCRIPTOR_JSON);
descriptor3 = new Path(dataSourceDir, "interval3/v2/0/" + DESCRIPTOR_JSON);
descriptor4_0 = new Path(dataSourceDir, "interval4/v1/0/" + DESCRIPTOR_JSON);
descriptor4_1 = new Path(dataSourceDir, "interval4/v1/1/" + DESCRIPTOR_JSON);
descriptor5 = new Path(dataSourceDir, "interval5/v1/1/" + "1_" + DESCRIPTOR_JSON);
indexZip1 = new Path(descriptor1.getParent(), INDEX_ZIP);
indexZip2 = new Path(descriptor2.getParent(), INDEX_ZIP);
indexZip3 = new Path(descriptor3.getParent(), INDEX_ZIP);
indexZip4_0 = new Path(descriptor4_0.getParent(), INDEX_ZIP);
indexZip4_1 = new Path(descriptor4_1.getParent(), INDEX_ZIP);
indexZip5 = new Path(descriptor5.getParent(), "1_" + INDEX_ZIP);
mapper.writeValue(fs.create(descriptor1), SEGMENT_1);
mapper.writeValue(fs.create(descriptor2), SEGMENT_2);
mapper.writeValue(fs.create(descriptor3), SEGMENT_3);
mapper.writeValue(fs.create(descriptor4_0), SEGMENT_4_0);
mapper.writeValue(fs.create(descriptor4_1), SEGMENT_4_1);
mapper.writeValue(fs.create(descriptor5), SEGMENT_5);
create(indexZip1);
create(indexZip2);
create(indexZip3);
create(indexZip4_0);
create(indexZip4_1);
create(indexZip5);
}
private void create(Path indexZip1) throws IOException
{
try (FSDataOutputStream os = fs.create(indexZip1)) {
}
}
@Test
public void testFindSegments() throws Exception
{
final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, mapper);
final Set<DataSegment> segments = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
Assert.assertEquals(6, segments.size());
DataSegment updatedSegment1 = null;
DataSegment updatedSegment2 = null;
DataSegment updatedSegment3 = null;
DataSegment updatedSegment4_0 = null;
DataSegment updatedSegment4_1 = null;
DataSegment updatedSegment5 = null;
for (DataSegment dataSegment : segments) {
if (dataSegment.getId().equals(SEGMENT_1.getId())) {
updatedSegment1 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_2.getId())) {
updatedSegment2 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_3.getId())) {
updatedSegment3 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_4_0.getId())) {
updatedSegment4_0 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_4_1.getId())) {
updatedSegment4_1 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_5.getId())) {
updatedSegment5 = dataSegment;
} else {
Assert.fail("Unexpected segment");
}
}
Assert.assertEquals(descriptor1.toUri().getPath(), getDescriptorPath(updatedSegment1));
Assert.assertEquals(descriptor2.toUri().getPath(), getDescriptorPath(updatedSegment2));
Assert.assertEquals(descriptor3.toUri().getPath(), getDescriptorPath(updatedSegment3));
Assert.assertEquals(descriptor4_0.toUri().getPath(), getDescriptorPath(updatedSegment4_0));
Assert.assertEquals(descriptor4_1.toUri().getPath(), getDescriptorPath(updatedSegment4_1));
Assert.assertEquals(descriptor5.toUri().getPath(), getDescriptorPathWithPartitionNum(updatedSegment5, 1));
final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1);
final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2);
final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3);
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
final String serializedSegment5 = mapper.writeValueAsString(updatedSegment5);
// since updateDescriptor was not enabled, descriptor.json still has stale information
Assert.assertNotEquals(serializedSegment1, readContent(descriptor1));
Assert.assertNotEquals(serializedSegment2, readContent(descriptor2));
Assert.assertNotEquals(serializedSegment3, readContent(descriptor3));
Assert.assertNotEquals(serializedSegment4_0, readContent(descriptor4_0));
Assert.assertNotEquals(serializedSegment4_1, readContent(descriptor4_1));
Assert.assertNotEquals(serializedSegment5, readContent(descriptor5));
// enable updateDescriptor so that descriptors.json will be updated to relfect the new loadSpec
final Set<DataSegment> segments2 = hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), true);
Assert.assertEquals(segments, segments2);
Assert.assertEquals(serializedSegment1, readContent(descriptor1));
Assert.assertEquals(serializedSegment2, readContent(descriptor2));
Assert.assertEquals(serializedSegment3, readContent(descriptor3));
Assert.assertEquals(serializedSegment4_0, readContent(descriptor4_0));
Assert.assertEquals(serializedSegment4_1, readContent(descriptor4_1));
Assert.assertEquals(serializedSegment5, readContent(descriptor5));
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail() throws Exception
{
// remove one of index.zip while keeping its descriptor.json
fs.delete(indexZip4_1, false);
final HdfsDataSegmentFinder hdfsDataSegmentFinder = new HdfsDataSegmentFinder(conf, mapper);
hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false);
}
@Test
public void testPreferNewestSegment() throws Exception
{
dataSourceDir = new Path(new Path(uriBase), "/usr/replicaDataSource");
descriptor1 = new Path(dataSourceDir, StringUtils.format("interval1/v1/%d_%s_%s", 0, "older", DESCRIPTOR_JSON));
descriptor2 = new Path(dataSourceDir, StringUtils.format("interval1/v1/%d_%s_%s", 0, "newer", DESCRIPTOR_JSON));
indexZip1 = new Path(descriptor1.getParent(), StringUtils.format("%d_%s_%s", 0, "older", INDEX_ZIP));
indexZip2 = new Path(descriptor2.getParent(), StringUtils.format("%d_%s_%s", 0, "newer", INDEX_ZIP));
mapper.writeValue(fs.create(descriptor1), SEGMENT_1);
mapper.writeValue(fs.create(descriptor2), SEGMENT_1);
create(indexZip1);
Thread.sleep(1000);
create(indexZip2);
final Set<DataSegment> segments = new HdfsDataSegmentFinder(conf, mapper).findSegments(
dataSourceDir.toString(), false
);
Assert.assertEquals(1, segments.size());
Assert.assertEquals(indexZip2.toUri().getPath(), segments.iterator().next().getLoadSpec().get("path"));
}
private String getDescriptorPath(DataSegment segment)
{
final Path indexzip = new Path(String.valueOf(segment.getLoadSpec().get("path")));
return indexzip.getParent() + "/" + DESCRIPTOR_JSON;
}
private String getDescriptorPathWithPartitionNum(DataSegment segment, int partitionNum)
{
final Path indexzip = new Path(String.valueOf(segment.getLoadSpec().get("path")));
return indexzip.getParent() + "/" + partitionNum + "_" + DESCRIPTOR_JSON;
}
private String readContent(Path descriptor) throws IOException
{
final FSDataInputStream is = fs.open(descriptor);
final String content = IOUtils.toString(is);
is.close();
return content;
}
}

View File

@ -131,7 +131,6 @@ public class HdfsDataSegmentKillerTest
Assert.assertTrue(fs.mkdirs(version11Dir));
fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_index.zip", 3)));
fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_descriptor.json", 3)));
killer.kill(getSegmentWithPath(new Path(version11Dir, "3_index.zip").toString()));
@ -168,7 +167,6 @@ public class HdfsDataSegmentKillerTest
Assert.assertTrue(fs.mkdirs(version11Dir));
fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_%s_index.zip", 3, uuid)));
fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_%s_descriptor.json", 3, uuid)));
killer.kill(getSegmentWithPath(new Path(version11Dir, StringUtils.format("%s_%s_index.zip", 3, uuid)).toString()));
@ -202,7 +200,6 @@ public class HdfsDataSegmentKillerTest
{
Assert.assertTrue(fs.mkdirs(path));
fs.createNewFile(new Path(path, "index.zip"));
fs.createNewFile(new Path(path, "descriptor.json"));
}
private DataSegment getSegmentWithPath(String path)

View File

@ -235,33 +235,15 @@ public class HdfsDataSegmentPusherTest
pushedSegment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(indexFile.exists());
File descriptorFile = new File(StringUtils.format(
"%s/%s/%d_descriptor.json",
storageDirectory,
segmentPath,
pushedSegment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(descriptorFile.exists());
//read actual data from descriptor file.
DataSegment fromDescriptorFileDataSegment = objectMapper.readValue(descriptorFile, DataSegment.class);
Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize());
Assert.assertEquals(segments[i], pushedSegment);
Assert.assertEquals(ImmutableMap.of(
"type",
"hdfs",
"path",
indexUri
), fromDescriptorFileDataSegment.getLoadSpec());
// rename directory after push
segmentPath = pusher.getStorageDir(fromDescriptorFileDataSegment, false);
indexFile = new File(StringUtils.format(
"%s/%s/%d_index.zip",
storageDirectory,
segmentPath,
fromDescriptorFileDataSegment.getShardSpec().getPartitionNum()
pushedSegment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(indexFile.exists());
@ -340,13 +322,6 @@ public class HdfsDataSegmentPusherTest
segment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(indexFile.exists());
File descriptorFile = new File(StringUtils.format(
"%s/%s/%d_descriptor.json",
storageDirectory,
segmentPath,
segment.getShardSpec().getPartitionNum()
));
Assert.assertTrue(descriptorFile.exists());
// push twice will fail and temp dir cleaned
File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath));
@ -485,29 +460,6 @@ public class HdfsDataSegmentPusherTest
path.toString()
);
path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.DESCRIPTOR_JSON,
hdfsDataSegmentPusher
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_descriptor.json",
path.toString()
);
path = JobHelper.makeTmpPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
@ -587,7 +539,7 @@ public class HdfsDataSegmentPusherTest
-1
),
JobHelper.INDEX_ZIP,
new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig(), objectMapper)
new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig())
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
@ -595,29 +547,6 @@ public class HdfsDataSegmentPusherTest
path.toString()
);
path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.DESCRIPTOR_JSON,
new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig(), objectMapper)
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/descriptor.json",
path.toString()
);
path = JobHelper.makeTmpPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
@ -633,7 +562,7 @@ public class HdfsDataSegmentPusherTest
-1
),
new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0),
new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig(), objectMapper)
new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig())
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"

View File

@ -2536,7 +2536,7 @@ public class KafkaIndexTaskTest
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig, objectMapper);
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override

View File

@ -2742,7 +2742,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig, objectMapper);
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override

View File

@ -1,140 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentFinder;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class S3DataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(S3DataSegmentFinder.class);
private final ServerSideEncryptingAmazonS3 s3Client;
private final ObjectMapper jsonMapper;
private final S3DataSegmentPusherConfig config;
@Inject
public S3DataSegmentFinder(
ServerSideEncryptingAmazonS3 s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
{
this.s3Client = s3Client;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException
{
final Map<SegmentId, Pair<DataSegment, Long>> timestampedSegments = new HashMap<>();
try {
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
config.getBucket(),
workingDirPath.length() == 0 ? config.getBaseKey() : workingDirPath,
config.getMaxListingLength()
);
while (objectSummaryIterator.hasNext()) {
final S3ObjectSummary objectSummary = objectSummaryIterator.next();
if ("descriptor.json".equals(S3Utils.toFilename(objectSummary.getKey()))) {
final String descriptorJson = objectSummary.getKey();
String indexZip = S3Utils.indexZipForSegmentPath(descriptorJson);
if (S3Utils.isObjectInBucketIgnoringPermission(s3Client, config.getBucket(), indexZip)) {
try (S3Object indexObject = s3Client.getObject(config.getBucket(), descriptorJson);
S3ObjectInputStream is = indexObject.getObjectContent()) {
final ObjectMetadata objectMetadata = indexObject.getObjectMetadata();
final DataSegment dataSegment = jsonMapper.readValue(is, DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getId(), indexZip);
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
if (!S3StorageDruidModule.SCHEME.equals(loadSpec.get("type")) ||
!indexZip.equals(loadSpec.get("key")) ||
!config.getBucket().equals(loadSpec.get("bucket"))) {
loadSpec.put("type", S3StorageDruidModule.SCHEME);
loadSpec.put("key", indexZip);
loadSpec.put("bucket", config.getBucket());
if (updateDescriptor) {
log.info(
"Updating loadSpec in descriptor.json at [%s] with new path [%s]",
descriptorJson,
indexObject
);
final ByteArrayInputStream bais = new ByteArrayInputStream(
StringUtils.toUtf8(jsonMapper.writeValueAsString(dataSegment))
);
s3Client.putObject(config.getBucket(), descriptorJson, bais, objectMetadata);
}
}
DataSegmentFinder.putInMapRetainingNewest(
timestampedSegments,
dataSegment,
objectMetadata.getLastModified() == null ? 0 : objectMetadata.getLastModified().getTime()
);
}
} else {
throw new SegmentLoadingException(
"index.zip didn't exist at [%s] while descriptor.json exists!?",
indexZip
);
}
}
}
}
catch (AmazonServiceException e) {
throw new SegmentLoadingException(e, "Problem interacting with S3");
}
catch (IOException e) {
throw new SegmentLoadingException(e, "IO exception");
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class);
Throwables.propagate(e);
}
return timestampedSegments.values().stream().map(x -> x.lhs).collect(Collectors.toSet());
}
}

View File

@ -50,12 +50,14 @@ public class S3DataSegmentKiller implements DataSegmentKiller
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path);
String s3DescriptorPath = descriptorPathForSegmentPath(s3Path);
if (s3Client.doesObjectExist(s3Bucket, s3Path)) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3Path);
}
// descriptor.json is a file to store segment metadata in deep storage. This file is deprecated and not stored
// anymore, but we still delete them if exists.
if (s3Client.doesObjectExist(s3Bucket, s3DescriptorPath)) {
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
@ -66,6 +68,11 @@ public class S3DataSegmentKiller implements DataSegmentKiller
}
}
private static String descriptorPathForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf('/')) + "/descriptor.json";
}
@Override
public void killAll()
{

View File

@ -68,7 +68,6 @@ public class S3DataSegmentMover implements DataSegmentMover
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path);
final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket");
final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey");
@ -77,7 +76,6 @@ public class S3DataSegmentMover implements DataSegmentMover
targetS3BaseKey,
DataSegmentPusher.getDefaultStorageDir(segment, false)
);
final String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path);
if (targetS3Bucket.isEmpty()) {
throw new SegmentLoadingException("Target S3 bucket is not specified");
@ -87,7 +85,6 @@ public class S3DataSegmentMover implements DataSegmentMover
}
safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path);
safeMove(s3Bucket, s3DescriptorPath, targetS3Bucket, targetS3DescriptorPath);
return segment.withLoadSpec(
ImmutableMap.<String, Object>builder()

View File

@ -21,7 +21,6 @@ package org.apache.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -36,7 +35,6 @@ import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
@ -46,18 +44,15 @@ public class S3DataSegmentPusher implements DataSegmentPusher
private final ServerSideEncryptingAmazonS3 s3Client;
private final S3DataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
@Inject
public S3DataSegmentPusher(
ServerSideEncryptingAmazonS3 s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
S3DataSegmentPusherConfig config
)
{
this.s3Client = s3Client;
this.config = config;
this.jsonMapper = jsonMapper;
log.info("Configured S3 as deep storage");
}
@ -99,20 +94,10 @@ public class S3DataSegmentPusher implements DataSegmentPusher
.withLoadSpec(makeLoadSpec(config.getBucket(), s3Path))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
final File descriptorFile = File.createTempFile("druid", "descriptor.json");
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment));
try {
return S3Utils.retryS3Operation(
() -> {
uploadFileIfPossible(config.getBucket(), s3Path, zipOutFile);
uploadFileIfPossible(
config.getBucket(),
S3Utils.descriptorPathForSegmentPath(s3Path),
descriptorFile
);
return outSegment;
}
@ -127,8 +112,6 @@ public class S3DataSegmentPusher implements DataSegmentPusher
finally {
log.info("Deleting temporary cached index.zip");
zipOutFile.delete();
log.info("Deleting temporary cached descriptor.json");
descriptorFile.delete();
}
}

View File

@ -106,7 +106,6 @@ public class S3StorageDruidModule implements DruidModule
.to(S3DataSegmentArchiver.class)
.in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class);
Binders.dataSegmentFinderBinder(binder).addBinding("s3").to(S3DataSegmentFinder.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class);
JsonConfigProvider.bind(binder, "druid.storage", S3StorageConfig.class);

View File

@ -172,11 +172,6 @@ public class S3Utils
) + "/index.zip";
}
static String descriptorPathForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf('/')) + "/descriptor.json";
}
static String indexZipForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf('/')) + "/index.zip";

View File

@ -1,516 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class S3DataSegmentFinderTest
{
private static final ObjectMapper mapper = TestHelper.makeJsonMapper();
private static final DataSegment SEGMENT_1 = DataSegment
.builder()
.dataSource("wikipedia")
.interval(Intervals.of("2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z"))
.version("2015-10-21T22:07:57.074Z")
.loadSpec(
ImmutableMap.of(
"type",
"s3_zip",
"bucket",
"bucket1",
"key",
"abc/somewhere/index.zip"
)
)
.dimensions(ImmutableList.of("language", "page"))
.metrics(ImmutableList.of("count"))
.build();
private static final DataSegment SEGMENT_2 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z"))
.build();
private static final DataSegment SEGMENT_3 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"))
.version("2015-10-22T22:07:57.074Z")
.build();
private static final DataSegment SEGMENT_4_0 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"))
.shardSpec(new NumberedShardSpec(0, 2))
.build();
private static final DataSegment SEGMENT_4_1 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"))
.shardSpec(new NumberedShardSpec(1, 2))
.build();
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
ServerSideEncryptingAmazonS3 mockS3Client;
S3DataSegmentPusherConfig config;
private String bucket;
private String baseKey;
private String descriptor1;
private String descriptor2;
private String descriptor3;
private String descriptor4_0;
private String descriptor4_1;
private String indexZip1;
private String indexZip2;
private String indexZip3;
private String indexZip4_0;
private String indexZip4_1;
@BeforeClass
public static void setUpStatic()
{
mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
}
@Before
public void setUp() throws Exception
{
bucket = "bucket1";
baseKey = "dataSource1";
config = new S3DataSegmentPusherConfig();
config.setBucket(bucket);
config.setBaseKey(baseKey);
mockS3Client = new MockAmazonS3Client(temporaryFolder.newFolder());
descriptor1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval1/v1/0/");
descriptor2 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval2/v1/0/");
descriptor3 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval3/v2/0/");
descriptor4_0 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval4/v1/0/");
descriptor4_1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval4/v1/1/");
indexZip1 = S3Utils.indexZipForSegmentPath(descriptor1);
indexZip2 = S3Utils.indexZipForSegmentPath(descriptor2);
indexZip3 = S3Utils.indexZipForSegmentPath(descriptor3);
indexZip4_0 = S3Utils.indexZipForSegmentPath(descriptor4_0);
indexZip4_1 = S3Utils.indexZipForSegmentPath(descriptor4_1);
mockS3Client.putObject(bucket, descriptor1, mapper.writeValueAsString(SEGMENT_1));
mockS3Client.putObject(bucket, descriptor2, mapper.writeValueAsString(SEGMENT_2));
mockS3Client.putObject(bucket, descriptor3, mapper.writeValueAsString(SEGMENT_3));
mockS3Client.putObject(bucket, descriptor4_0, mapper.writeValueAsString(SEGMENT_4_0));
mockS3Client.putObject(bucket, descriptor4_1, mapper.writeValueAsString(SEGMENT_4_1));
mockS3Client.putObject(bucket, indexZip1, "dummy");
mockS3Client.putObject(bucket, indexZip2, "dummy");
mockS3Client.putObject(bucket, indexZip3, "dummy");
mockS3Client.putObject(bucket, indexZip4_0, "dummy");
mockS3Client.putObject(bucket, indexZip4_1, "dummy");
}
@Test
public void testFindSegments() throws Exception
{
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final Set<DataSegment> segments = s3DataSegmentFinder.findSegments("", false);
Assert.assertEquals(5, segments.size());
DataSegment updatedSegment1 = null;
DataSegment updatedSegment2 = null;
DataSegment updatedSegment3 = null;
DataSegment updatedSegment4_0 = null;
DataSegment updatedSegment4_1 = null;
for (DataSegment dataSegment : segments) {
if (dataSegment.getId().equals(SEGMENT_1.getId())) {
updatedSegment1 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_2.getId())) {
updatedSegment2 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_3.getId())) {
updatedSegment3 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_4_0.getId())) {
updatedSegment4_0 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_4_1.getId())) {
updatedSegment4_1 = dataSegment;
} else {
Assert.fail("Unexpected segment identifier : " + dataSegment.getId());
}
}
Assert.assertEquals(descriptor1, getDescriptorPath(updatedSegment1));
Assert.assertEquals(descriptor2, getDescriptorPath(updatedSegment2));
Assert.assertEquals(descriptor3, getDescriptorPath(updatedSegment3));
Assert.assertEquals(descriptor4_0, getDescriptorPath(updatedSegment4_0));
Assert.assertEquals(descriptor4_1, getDescriptorPath(updatedSegment4_1));
final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1);
final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2);
final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3);
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
Assert.assertNotEquals(
serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent())
);
Assert.assertNotEquals(
serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent())
);
Assert.assertNotEquals(
serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent())
);
Assert.assertNotEquals(
serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent())
);
Assert.assertNotEquals(
serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent())
);
final Set<DataSegment> segments2 = s3DataSegmentFinder.findSegments("", true);
Assert.assertEquals(segments, segments2);
Assert.assertEquals(
serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent())
);
Assert.assertEquals(
serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent())
);
Assert.assertEquals(
serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent())
);
Assert.assertEquals(
serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent())
);
Assert.assertEquals(
serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent())
);
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail() throws SegmentLoadingException
{
mockS3Client.deleteObject(bucket, indexZip4_1);
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
s3DataSegmentFinder.findSegments("", false);
}
@Test
public void testFindSegmentsWithmaxListingLength() throws SegmentLoadingException
{
config.setMaxListingLength(3);
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final Set<DataSegment> segments = s3DataSegmentFinder.findSegments("", false);
Assert.assertEquals(5, segments.size());
}
@Test
public void testFindSegmentsWithworkingDirPath() throws SegmentLoadingException
{
config.setBaseKey("");
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final Set<DataSegment> segments = s3DataSegmentFinder.findSegments(baseKey, false);
Assert.assertEquals(5, segments.size());
}
@Test
public void testFindSegmentsUpdateLoadSpec() throws Exception
{
config.setBucket("amazing");
final DataSegment segmentMissingLoadSpec = DataSegment.builder(SEGMENT_1).loadSpec(ImmutableMap.of()).build();
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final String segmentPath = baseKey + "/interval_missing_load_spec/v1/1/";
final String descriptorPath = S3Utils.descriptorPathForSegmentPath(segmentPath);
final String indexPath = S3Utils.indexZipForSegmentPath(segmentPath);
mockS3Client.putObject(config.getBucket(), descriptorPath, mapper.writeValueAsString(segmentMissingLoadSpec));
mockS3Client.putObject(config.getBucket(), indexPath, "dummy");
Set<DataSegment> segments = s3DataSegmentFinder.findSegments(segmentPath, false);
Assert.assertEquals(1, segments.size());
// Guaranteed there's only 1 element due to prior assert
DataSegment testSegment = segments.iterator().next();
Map<String, Object> testLoadSpec = testSegment.getLoadSpec();
Assert.assertEquals("amazing", testLoadSpec.get("bucket"));
Assert.assertEquals("s3_zip", testLoadSpec.get("type"));
Assert.assertEquals(indexPath, testLoadSpec.get("key"));
}
@Test
public void testPreferNewestSegment() throws Exception
{
baseKey = "replicaDataSource";
config = new S3DataSegmentPusherConfig();
config.setBucket(bucket);
config.setBaseKey(baseKey);
descriptor1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval10/v1/0/older/");
descriptor2 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval10/v1/0/newer/");
indexZip1 = S3Utils.indexZipForSegmentPath(descriptor1);
indexZip2 = S3Utils.indexZipForSegmentPath(descriptor2);
mockS3Client.putObject(bucket, descriptor1, mapper.writeValueAsString(SEGMENT_1));
mockS3Client.putObject(bucket, indexZip1, "dummy");
Thread.sleep(1000);
mockS3Client.putObject(bucket, descriptor2, mapper.writeValueAsString(SEGMENT_1));
mockS3Client.putObject(bucket, indexZip2, "dummy");
final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper);
final Set<DataSegment> segments = s3DataSegmentFinder.findSegments("", false);
Assert.assertEquals(1, segments.size());
Assert.assertEquals(indexZip2, segments.iterator().next().getLoadSpec().get("key"));
}
private String getDescriptorPath(DataSegment segment)
{
return S3Utils.descriptorPathForSegmentPath(String.valueOf(segment.getLoadSpec().get("key")));
}
private static class MockAmazonS3Client extends ServerSideEncryptingAmazonS3
{
private final File baseDir;
private final Map<String, Map<String, ObjectMetadata>> storage = new HashMap<>();
public MockAmazonS3Client(File baseDir)
{
super(new AmazonS3Client(), new NoopServerSideEncryption());
this.baseDir = baseDir;
}
@Override
public boolean doesObjectExist(String bucketName, String objectName)
{
final Map<String, ObjectMetadata> keys = storage.get(bucketName);
if (keys != null) {
return keys.keySet().contains(objectName);
}
return false;
}
@Override
public ListObjectsV2Result listObjectsV2(ListObjectsV2Request listObjectsV2Request)
{
final String bucketName = listObjectsV2Request.getBucketName();
final String prefix = listObjectsV2Request.getPrefix();
final List<String> keysOrigin = storage.get(bucketName) == null
? ImmutableList.of()
: new ArrayList<>(storage.get(bucketName).keySet());
Predicate<String> prefixFilter = new Predicate<String>()
{
@Override
public boolean apply(@Nullable String input)
{
return input.startsWith(prefix);
}
};
ImmutableList<String> keys = ImmutableList.copyOf(
Ordering.natural().sortedCopy(Iterables.filter(keysOrigin, prefixFilter))
);
int startOffset = 0;
if (listObjectsV2Request.getContinuationToken() != null) {
startOffset = keys.indexOf(listObjectsV2Request.getContinuationToken()) + 1;
}
int endOffset = startOffset + listObjectsV2Request.getMaxKeys(); // exclusive
if (endOffset > keys.size()) {
endOffset = keys.size();
}
String newPriorLastkey = keys.get(endOffset - 1);
if (endOffset == (keys.size())) {
newPriorLastkey = null;
}
List<S3ObjectSummary> objects = new ArrayList<>();
for (String objectKey : keys.subList(startOffset, endOffset)) {
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucketName);
objectSummary.setKey(objectKey);
objects.add(objectSummary);
}
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setBucketName(bucketName);
result.setKeyCount(objects.size());
result.getObjectSummaries().addAll(objects);
result.setContinuationToken(newPriorLastkey);
result.setTruncated(newPriorLastkey != null);
return result;
}
@Override
public S3Object getObject(String bucketName, String objectKey)
{
if (!storage.containsKey(bucketName)) {
AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest");
ex.setStatusCode(404);
ex.setErrorCode("NoSuchBucket");
throw ex;
}
if (!storage.get(bucketName).keySet().contains(objectKey)) {
AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest");
ex.setStatusCode(404);
ex.setErrorCode("NoSuchKey");
throw ex;
}
final File objectPath = new File(baseDir, objectKey);
S3Object storageObject = new S3Object();
storageObject.setBucketName(bucketName);
storageObject.setKey(objectKey);
storageObject.setObjectMetadata(storage.get(bucketName).get(objectKey));
try {
storageObject.setObjectContent(new FileInputStream(objectPath));
}
catch (FileNotFoundException e) {
AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest", e);
ex.setStatusCode(500);
ex.setErrorCode("InternalError");
throw ex;
}
return storageObject;
}
@Override
public S3Object getObject(GetObjectRequest request)
{
return getObject(request.getBucketName(), request.getKey());
}
@Override
public PutObjectResult putObject(String bucketName, String key, String data)
{
ObjectMetadata metadata = new ObjectMetadata();
metadata.setLastModified(DateTimes.nowUtc().toDate());
return putObject(bucketName, key, new ByteArrayInputStream(StringUtils.toUtf8(data)), metadata);
}
@Override
public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata)
{
if (!storage.containsKey(bucketName)) {
storage.put(bucketName, new HashMap<>());
}
storage.get(bucketName).put(key, metadata);
final File objectPath = new File(baseDir, key);
if (!objectPath.getParentFile().exists()) {
objectPath.getParentFile().mkdirs();
}
try {
try (
InputStream in = input
) {
FileUtils.copyInputStreamToFile(in, objectPath);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return new PutObjectResult();
}
@Override
public void deleteObject(String bucketName, String objectKey)
{
storage.get(bucketName).remove(objectKey);
final File objectPath = new File(baseDir, objectKey);
objectPath.delete();
}
}
}

View File

@ -78,10 +78,6 @@ public class S3DataSegmentMoverTest
"main",
"baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"
);
mockS3Client.putObject(
"main",
"baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"
);
DataSegment movedSegment = mover.move(
sourceSegment,
@ -104,10 +100,6 @@ public class S3DataSegmentMoverTest
"archive",
"targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"
);
mockS3Client.putObject(
"archive",
"targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"
);
DataSegment movedSegment = mover.move(
sourceSegment,

View File

@ -24,25 +24,18 @@ import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Files;
import org.apache.commons.io.IOUtils;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.regex.Pattern;
@ -94,34 +87,13 @@ public class S3DataSegmentPusherTest
.andReturn(new PutObjectResult())
.once();
EasyMock.expect(s3Client.getBucketAcl(EasyMock.eq("bucket"))).andReturn(acl).once();
Capture<PutObjectRequest> capturedPutRequest = Capture.newInstance();
ValueContainer<String> capturedS3SegmentJson = new ValueContainer<>();
EasyMock.expect(s3Client.putObject(EasyMock.capture(capturedPutRequest)))
.andAnswer(
new IAnswer<PutObjectResult>()
{
@Override
public PutObjectResult answer() throws Throwable
{
capturedS3SegmentJson.setValue(
IOUtils.toString(new FileInputStream(capturedPutRequest.getValue().getFile()), "utf-8")
);
return new PutObjectResult();
}
}
)
.once();
EasyMock.replay(s3Client);
S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig();
config.setBucket("bucket");
config.setBaseKey("key");
ObjectMapper objectMapper = new DefaultObjectMapper();
S3DataSegmentPusher pusher = new S3DataSegmentPusher(s3Client, config, objectMapper);
S3DataSegmentPusher pusher = new S3DataSegmentPusher(s3Client, config);
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
@ -153,10 +125,6 @@ public class S3DataSegmentPusherTest
);
Assert.assertEquals("s3_zip", segment.getLoadSpec().get("type"));
// Verify that the pushed S3Object contains the correct data
String segmentJson = objectMapper.writeValueAsString(segment);
Assert.assertEquals(segmentJson, capturedS3SegmentJson.getValue());
EasyMock.verify(s3Client);
}
}

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
<Logger level="debug" name="org.apache.druid" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>
</Configuration>

View File

@ -826,13 +826,6 @@ public class IndexGeneratorJob implements Jobby
JobHelper.INDEX_ZIP,
config.DATA_SEGMENT_PUSHER
),
JobHelper.makeFileNamePath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate,
JobHelper.DESCRIPTOR_JSON,
config.DATA_SEGMENT_PUSHER
),
JobHelper.makeTmpPath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,

View File

@ -94,7 +94,6 @@ public class JobHelper
}
public static final String INDEX_ZIP = "index.zip";
public static final String DESCRIPTOR_JSON = "descriptor.json";
/**
* Dose authenticate against a secured hadoop cluster
@ -423,7 +422,6 @@ public class JobHelper
final Progressable progressable,
final File mergedBase,
final Path finalIndexZipFilePath,
final Path finalDescriptorPath,
final Path tmpPath,
DataSegmentPusher dataSegmentPusher
)
@ -472,12 +470,6 @@ public class JobHelper
);
}
writeSegmentDescriptor(
outputFS,
finalSegment,
finalDescriptorPath,
progressable
);
return finalSegment;
}

View File

@ -563,13 +563,6 @@ public class HadoopConverterJob
JobHelper.INDEX_ZIP,
config.DATA_SEGMENT_PUSHER
),
JobHelper.makeFileNamePath(
baseOutputPath,
outputFS,
finalSegmentTemplate,
JobHelper.DESCRIPTOR_JSON,
config.DATA_SEGMENT_PUSHER
),
JobHelper.makeTmpPath(
baseOutputPath,
outputFS,

View File

@ -386,27 +386,11 @@ public class BatchDeltaIngestionTest
Assert.assertTrue(segmentFolder.exists());
File descriptor = new File(segmentFolder, "descriptor.json");
File indexZip = new File(segmentFolder, "index.zip");
Assert.assertTrue(descriptor.exists());
Assert.assertTrue(indexZip.exists());
DataSegment dataSegment = MAPPER.readValue(descriptor, DataSegment.class);
Assert.assertEquals("website", dataSegment.getDataSource());
Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
Assert.assertEquals(INTERVAL_FULL, dataSegment.getInterval());
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals(expectedDimensions, dataSegment.getDimensions());
Assert.assertEquals(expectedMetrics, dataSegment.getMetrics());
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
Assert.assertEquals(0, spec.getPartitionNum());
Assert.assertEquals(1, spec.getPartitions());
File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
new LocalDataSegmentPuller().getSegmentFiles(dataSegment, tmpUnzippedSegmentDir);
new LocalDataSegmentPuller().getSegmentFiles(indexZip, tmpUnzippedSegmentDir);
QueryableIndex index = INDEX_IO.loadIndex(tmpUnzippedSegmentDir);
StorageAdapter adapter = new QueryableIndexStorageAdapter(index);

View File

@ -73,9 +73,12 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
@RunWith(Parameterized.class)
@ -520,7 +523,7 @@ public class IndexGeneratorJobTest
null,
maxRowsInMemory,
maxBytesInMemory,
false,
true,
false,
false,
false,
@ -593,6 +596,13 @@ public class IndexGeneratorJobTest
{
Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config));
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
IndexGeneratorJob
.getPublishedSegments(config)
.forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>())
.add(segment));
final Map<Interval, List<File>> intervalToIndexFiles = new HashMap<>();
int segmentNum = 0;
for (DateTime currTime = interval.getStart(); currTime.isBefore(interval.getEnd()); currTime = currTime.plusDays(1)) {
Object[][] shardInfo = shardInfoForEachSegment[segmentNum++];
@ -613,14 +623,34 @@ public class IndexGeneratorJobTest
File individualSegmentFolder = new File(segmentOutputFolder, Integer.toString(partitionNum));
Assert.assertTrue(individualSegmentFolder.exists());
File descriptor = new File(individualSegmentFolder, "descriptor.json");
File indexZip = new File(individualSegmentFolder, "index.zip");
Assert.assertTrue(descriptor.exists());
Assert.assertTrue(indexZip.exists());
DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class);
intervalToIndexFiles.computeIfAbsent(new Interval(currTime, currTime.plusDays(1)), k -> new ArrayList<>())
.add(indexZip);
}
}
Assert.assertEquals(intervalToSegments.size(), intervalToIndexFiles.size());
segmentNum = 0;
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<DataSegment> segments = entry.getValue();
final List<File> indexFiles = intervalToIndexFiles.get(interval);
Collections.sort(segments);
indexFiles.sort(Comparator.comparing(File::getAbsolutePath));
Assert.assertNotNull(indexFiles);
Assert.assertEquals(segments.size(), indexFiles.size());
Object[][] shardInfo = shardInfoForEachSegment[segmentNum++];
for (int i = 0; i < segments.size(); i++) {
final DataSegment dataSegment = segments.get(i);
final File indexZip = indexFiles.get(i);
Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
Assert.assertEquals(new Interval(currTime, currTime.plusDays(1)), dataSegment.getInterval());
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
@ -644,15 +674,15 @@ public class IndexGeneratorJobTest
if (forceExtendableShardSpecs) {
NumberedShardSpec spec = (NumberedShardSpec) dataSegment.getShardSpec();
Assert.assertEquals(partitionNum, spec.getPartitionNum());
Assert.assertEquals(i, spec.getPartitionNum());
Assert.assertEquals(shardInfo.length, spec.getPartitions());
} else if ("hashed".equals(partitionType)) {
Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum];
Integer[] hashShardInfo = (Integer[]) shardInfo[i];
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum());
Assert.assertEquals((int) hashShardInfo[1], spec.getPartitions());
} else if ("single".equals(partitionType)) {
String[] singleDimensionShardInfo = (String[]) shardInfo[partitionNum];
String[] singleDimensionShardInfo = (String[]) shardInfo[i];
SingleDimensionShardSpec spec = (SingleDimensionShardSpec) dataSegment.getShardSpec();
Assert.assertEquals(singleDimensionShardInfo[0], spec.getStart());
Assert.assertEquals(singleDimensionShardInfo[1], spec.getEnd());
@ -662,5 +692,4 @@ public class IndexGeneratorJobTest
}
}
}
}

View File

@ -22,10 +22,11 @@ package org.apache.druid.indexer.hadoop;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@ -33,7 +34,6 @@ import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -46,17 +46,22 @@ public class DatasourceRecordReaderTest
@Test
public void testSanity() throws Exception
{
URL segmentDesciptor = this.getClass().getClassLoader().getResource("test-segment/descriptor.json");
DataSegment segment = HadoopDruidIndexerConfig.JSON_MAPPER
.readValue(segmentDesciptor, DataSegment.class)
.withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath()
)
);
final DataSegment segment = new DataSegment(
"testds",
Intervals.of("2014-10-22T00:00:00.000Z/2014-10-23T00:00:00.000Z"),
"2015-07-15T22:02:40.171Z",
ImmutableMap.of(
"type",
"local",
"path",
this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath()
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NumberedShardSpec(0, 1),
9,
4096
);
InputSplit split = new DatasourceInputSplit(Collections.singletonList(WindowedDataSegment.of(segment)), null);
Configuration config = new Configuration();

View File

@ -336,9 +336,8 @@ public class RealtimeIndexTask extends AbstractTask
// NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means
// NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip
// NOTE: (partitionNum_index.zip for HDFS data storage) and descriptor.json (partitionNum_descriptor.json for
// NOTE: HDFS data storage) to mismatch, or it can cause historical nodes to load different instances of
// NOTE: the "same" segment.
// NOTE: (partitionNum_index.zip for HDFS data storage) to mismatch, or it can cause historical nodes to load
// NOTE: different instances of the "same" segment.
final PlumberSchool plumberSchool = new RealtimePlumberSchool(
toolbox.getEmitter(),
toolbox.getQueryRunnerFactoryConglomerate(),

View File

@ -349,8 +349,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
{
return deepStorageDir;
}
},
objectMapper
}
)
{
@Override

View File

@ -185,8 +185,7 @@ public class IndexTaskTest
{
return deepStorageDir;
}
},
jsonMapper
}
)
{
@Override

View File

@ -228,8 +228,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
{
return localDeepStorage;
}
},
getObjectMapper()
}
),
new DataSegmentKiller()
{

View File

@ -27,10 +27,8 @@ import com.google.inject.Key;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.segment.loading.DataSegmentFinder;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentFinder;
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
@ -67,8 +65,6 @@ public class LocalDataStorageDruidModule implements DruidModule
Key.get(DataSegmentKiller.class),
Key.get(LocalDataSegmentKiller.class)
);
PolyBind.createChoice(binder, "druid.storage.type", Key.get(DataSegmentFinder.class), null);
}
private static void bindDeepStorageLocal(Binder binder)
@ -88,11 +84,6 @@ public class LocalDataStorageDruidModule implements DruidModule
.to(LocalDataSegmentPusher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentFinder.class))
.addBinding(SCHEME)
.to(LocalDataSegmentFinder.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.storage", LocalDataSegmentPusherConfig.class);
}

View File

@ -1,114 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.apache.druid.guice.LocalDataStorageDruidModule;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
*/
public class LocalDataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(LocalDataSegmentFinder.class);
private final ObjectMapper mapper;
@Inject
public LocalDataSegmentFinder(ObjectMapper mapper)
{
this.mapper = mapper;
}
@Override
public Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException
{
final Map<SegmentId, Pair<DataSegment, Long>> timestampedSegments = new HashMap<>();
final File workingDir = new File(workingDirPath);
if (!workingDir.isDirectory()) {
throw new SegmentLoadingException("Working directory [%s] didn't exist !?", workingDir);
}
recursiveSearchSegments(timestampedSegments, workingDir, updateDescriptor);
return timestampedSegments.values().stream().map(x -> x.lhs).collect(Collectors.toSet());
}
private void recursiveSearchSegments(
Map<SegmentId, Pair<DataSegment, Long>> timestampedSegments,
File workingDir,
boolean updateDescriptor
) throws SegmentLoadingException
{
for (File file : workingDir.listFiles()) {
if (file.isDirectory()) {
recursiveSearchSegments(timestampedSegments, file, updateDescriptor);
} else if ("descriptor.json".equals(file.getName())) {
final File indexZip = new File(file.getParentFile(), "index.zip");
if (indexZip.exists()) {
try {
final DataSegment dataSegment = mapper.readValue(FileUtils.readFileToString(file), DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getId(), indexZip.getAbsoluteFile());
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
if (!loadSpec.get("type").equals(LocalDataStorageDruidModule.SCHEME) || !loadSpec.get("path")
.equals(indexZip.getAbsoluteFile())) {
loadSpec.put("type", LocalDataStorageDruidModule.SCHEME);
loadSpec.put("path", indexZip.getAbsolutePath());
if (updateDescriptor) {
log.info(
"Updating loadSpec in descriptor.json at [%s] with new path [%s]",
file.getAbsolutePath(),
indexZip.toString()
);
FileUtils.writeStringToFile(file, mapper.writeValueAsString(dataSegment));
}
}
DataSegmentFinder.putInMapRetainingNewest(timestampedSegments, dataSegment, indexZip.lastModified());
}
catch (IOException e) {
throw new SegmentLoadingException(
e,
"Failed to read descriptor.json for segment located at [%s]",
file.getAbsoluteFile()
);
}
} else {
throw new SegmentLoadingException(
"index.zip didn't exist at [%s] while descripter.json exists!?",
indexZip.getAbsoluteFile()
);
}
}
}
}
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
@ -32,8 +31,6 @@ import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.UUID;
@ -42,16 +39,13 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
private static final Logger log = new Logger(LocalDataSegmentPusher.class);
private static final String INDEX_FILENAME = "index.zip";
private static final String DESCRIPTOR_FILENAME = "descriptor.json";
private final LocalDataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
@Inject
public LocalDataSegmentPusher(LocalDataSegmentPusherConfig config, ObjectMapper jsonMapper)
public LocalDataSegmentPusher(LocalDataSegmentPusherConfig config)
{
this.config = config;
this.jsonMapper = jsonMapper;
log.info("Configured local filesystem as deep storage");
}
@ -84,12 +78,9 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
size += file.length();
}
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outDir.toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
outDir
);
return segment.withLoadSpec(makeLoadSpec(outDir.toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
}
final File tmpOutDir = new File(baseStorageDir, makeIntermediateDir());
@ -100,26 +91,17 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
final File tmpIndexFile = new File(tmpOutDir, INDEX_FILENAME);
final long size = compressSegment(dataSegmentFile, tmpIndexFile);
final File tmpDescriptorFile = new File(tmpOutDir, DESCRIPTOR_FILENAME);
DataSegment dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_FILENAME).toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
tmpDescriptorFile
);
final DataSegment dataSegment = segment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_FILENAME).toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
FileUtils.forceMkdir(outDir);
final File indexFileTarget = new File(outDir, tmpIndexFile.getName());
final File descriptorFileTarget = new File(outDir, tmpDescriptorFile.getName());
if (!tmpIndexFile.renameTo(indexFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget);
}
if (!tmpDescriptorFile.renameTo(descriptorFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpDescriptorFile, descriptorFileTarget);
}
return dataSegment;
}
finally {
@ -143,16 +125,4 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
log.info("Compressing files from[%s] to [%s]", dataSegmentFile, dest);
return CompressionUtils.zip(dataSegmentFile, dest, true);
}
private DataSegment createDescriptorFile(DataSegment segment, File dest) throws IOException
{
log.info("Creating descriptor file at[%s]", dest);
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(
dest.toPath(), jsonMapper.writeValueAsBytes(segment), StandardOpenOption.CREATE, StandardOpenOption.SYNC
);
return segment;
}
}

View File

@ -1,257 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Set;
/**
*/
public class LocalDataSegmentFinderTest
{
private static final ObjectMapper mapper = TestHelper.makeJsonMapper();
private static final String DESCRIPTOR_JSON = "descriptor.json";
private static final String INDEX_ZIP = "index.zip";
private static final DataSegment SEGMENT_1 = DataSegment
.builder()
.dataSource("wikipedia")
.interval(Intervals.of("2013-08-31T00:00:00.000Z/2013-09-01T00:00:00.000Z"))
.version("2015-10-21T22:07:57.074Z")
.loadSpec(
ImmutableMap.of(
"type",
"local",
"path",
"/tmp/somewhere/index.zip"
)
)
.dimensions(ImmutableList.of("language", "page"))
.metrics(ImmutableList.of("count"))
.build();
private static final DataSegment SEGMENT_2 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-01T00:00:00.000Z/2013-09-02T00:00:00.000Z"))
.build();
private static final DataSegment SEGMENT_3 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"))
.version("2015-10-22T22:07:57.074Z")
.build();
private static final DataSegment SEGMENT_4_0 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"))
.shardSpec(new NumberedShardSpec(0, 2))
.build();
private static final DataSegment SEGMENT_4_1 = DataSegment
.builder(SEGMENT_1)
.interval(Intervals.of("2013-09-02T00:00:00.000Z/2013-09-03T00:00:00.000Z"))
.shardSpec(new NumberedShardSpec(1, 2))
.build();
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private File dataSourceDir;
private File descriptor1;
private File descriptor2;
private File descriptor3;
private File descriptor4_0;
private File descriptor4_1;
private File indexZip1;
private File indexZip2;
private File indexZip3;
private File indexZip4_0;
private File indexZip4_1;
@BeforeClass
public static void setUpStatic()
{
mapper.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
}
@Before
public void setUp() throws Exception
{
dataSourceDir = temporaryFolder.newFolder();
descriptor1 = new File(dataSourceDir.getAbsolutePath() + "/interval1/v1/0", DESCRIPTOR_JSON);
descriptor2 = new File(dataSourceDir.getAbsolutePath() + "/interval2/v1/0", DESCRIPTOR_JSON);
descriptor3 = new File(dataSourceDir.getAbsolutePath() + "/interval3/v2/0", DESCRIPTOR_JSON);
descriptor4_0 = new File(dataSourceDir.getAbsolutePath() + "/interval4/v1/0", DESCRIPTOR_JSON);
descriptor4_1 = new File(dataSourceDir.getAbsolutePath() + "/interval4/v1/1", DESCRIPTOR_JSON);
descriptor1.getParentFile().mkdirs();
descriptor2.getParentFile().mkdirs();
descriptor3.getParentFile().mkdirs();
descriptor4_0.getParentFile().mkdirs();
descriptor4_1.getParentFile().mkdirs();
mapper.writeValue(descriptor1, SEGMENT_1);
mapper.writeValue(descriptor2, SEGMENT_2);
mapper.writeValue(descriptor3, SEGMENT_3);
mapper.writeValue(descriptor4_0, SEGMENT_4_0);
mapper.writeValue(descriptor4_1, SEGMENT_4_1);
indexZip1 = new File(descriptor1.getParentFile(), INDEX_ZIP);
indexZip2 = new File(descriptor2.getParentFile(), INDEX_ZIP);
indexZip3 = new File(descriptor3.getParentFile(), INDEX_ZIP);
indexZip4_0 = new File(descriptor4_0.getParentFile(), INDEX_ZIP);
indexZip4_1 = new File(descriptor4_1.getParentFile(), INDEX_ZIP);
indexZip1.createNewFile();
indexZip2.createNewFile();
indexZip3.createNewFile();
indexZip4_0.createNewFile();
indexZip4_1.createNewFile();
}
@Test
public void testFindSegments() throws SegmentLoadingException, IOException
{
final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper);
final Set<DataSegment> segments = localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false);
Assert.assertEquals(5, segments.size());
DataSegment updatedSegment1 = null;
DataSegment updatedSegment2 = null;
DataSegment updatedSegment3 = null;
DataSegment updatedSegment4_0 = null;
DataSegment updatedSegment4_1 = null;
for (DataSegment dataSegment : segments) {
if (dataSegment.getId().equals(SEGMENT_1.getId())) {
updatedSegment1 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_2.getId())) {
updatedSegment2 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_3.getId())) {
updatedSegment3 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_4_0.getId())) {
updatedSegment4_0 = dataSegment;
} else if (dataSegment.getId().equals(SEGMENT_4_1.getId())) {
updatedSegment4_1 = dataSegment;
} else {
Assert.fail("Unexpected segment");
}
}
Assert.assertEquals(descriptor1.getAbsolutePath(), getDescriptorPath(updatedSegment1));
Assert.assertEquals(descriptor2.getAbsolutePath(), getDescriptorPath(updatedSegment2));
Assert.assertEquals(descriptor3.getAbsolutePath(), getDescriptorPath(updatedSegment3));
Assert.assertEquals(descriptor4_0.getAbsolutePath(), getDescriptorPath(updatedSegment4_0));
Assert.assertEquals(descriptor4_1.getAbsolutePath(), getDescriptorPath(updatedSegment4_1));
final String serializedSegment1 = mapper.writeValueAsString(updatedSegment1);
final String serializedSegment2 = mapper.writeValueAsString(updatedSegment2);
final String serializedSegment3 = mapper.writeValueAsString(updatedSegment3);
final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0);
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
// since updateDescriptor was not enabled, descriptor.json still has stale information
Assert.assertNotEquals(serializedSegment1, FileUtils.readFileToString(descriptor1));
Assert.assertNotEquals(serializedSegment2, FileUtils.readFileToString(descriptor2));
Assert.assertNotEquals(serializedSegment3, FileUtils.readFileToString(descriptor3));
Assert.assertNotEquals(serializedSegment4_0, FileUtils.readFileToString(descriptor4_0));
Assert.assertNotEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1));
// enable updateDescriptor so that descriptors.json will be updated to relfect the new loadSpec
final Set<DataSegment> segments2 = localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), true);
Assert.assertEquals(segments, segments2);
Assert.assertEquals(serializedSegment1, FileUtils.readFileToString(descriptor1));
Assert.assertEquals(serializedSegment2, FileUtils.readFileToString(descriptor2));
Assert.assertEquals(serializedSegment3, FileUtils.readFileToString(descriptor3));
Assert.assertEquals(serializedSegment4_0, FileUtils.readFileToString(descriptor4_0));
Assert.assertEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1));
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail() throws SegmentLoadingException
{
// remove one of index.zip while keeping its descriptor.json
indexZip4_1.delete();
final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper);
localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false);
}
@Test
public void testPreferNewestSegment() throws Exception
{
dataSourceDir = temporaryFolder.newFolder();
descriptor1 = new File(dataSourceDir.getAbsolutePath() + "/interval10/v10/0/older", DESCRIPTOR_JSON);
descriptor2 = new File(dataSourceDir.getAbsolutePath() + "/interval10/v10/0/newer", DESCRIPTOR_JSON);
descriptor1.getParentFile().mkdirs();
descriptor2.getParentFile().mkdirs();
mapper.writeValue(descriptor1, SEGMENT_1);
mapper.writeValue(descriptor2, SEGMENT_1);
indexZip1 = new File(descriptor1.getParentFile(), INDEX_ZIP);
indexZip2 = new File(descriptor2.getParentFile(), INDEX_ZIP);
FileOutputStream fos1 = new FileOutputStream(indexZip1);
fos1.getFD().sync();
fos1.close();
Thread.sleep(1000);
FileOutputStream fos2 = new FileOutputStream(indexZip2);
fos2.getFD().sync();
fos2.close();
final Set<DataSegment> segments = new LocalDataSegmentFinder(mapper).findSegments(
dataSourceDir.getAbsolutePath(), false
);
Assert.assertEquals(1, segments.size());
Assert.assertEquals(indexZip2.getAbsolutePath(), segments.iterator().next().getLoadSpec().get("path"));
}
private String getDescriptorPath(DataSegment segment)
{
final File indexzip = new File(String.valueOf(segment.getLoadSpec().get("path")));
return indexzip.getParent() + "/" + DESCRIPTOR_JSON;
}
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
@ -27,7 +26,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.druid.java.util.common.CompressionUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
@ -80,7 +78,7 @@ public class LocalDataSegmentPusherTest
{
config = new LocalDataSegmentPusherConfig();
config.storageDirectory = temporaryFolder.newFolder();
localDataSegmentPusher = new LocalDataSegmentPusher(config, TestHelper.makeJsonMapper());
localDataSegmentPusher = new LocalDataSegmentPusher(config);
dataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(dataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x9));
}
@ -113,9 +111,7 @@ public class LocalDataSegmentPusherTest
localDataSegmentPusher.getStorageDir(returnSegment, false)
);
File versionFile = new File(outDir, "index.zip");
File descriptorJson = new File(outDir, "descriptor.json");
Assert.assertTrue(versionFile.exists());
Assert.assertTrue(descriptorJson.exists());
}
}
@ -171,7 +167,7 @@ public class LocalDataSegmentPusherTest
Assert.assertEquals(
"file:/druid",
new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop()
new LocalDataSegmentPusher(config).getPathForHadoop()
);
}
@ -182,7 +178,7 @@ public class LocalDataSegmentPusherTest
Assert.assertEquals(
StringUtils.format("file:%s/druid", System.getProperty("user.dir")),
new LocalDataSegmentPusher(config, new ObjectMapper()).getPathForHadoop()
new LocalDataSegmentPusher(config).getPathForHadoop()
);
}
}

View File

@ -1,134 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentFinder;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.DruidNode;
import org.apache.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.List;
import java.util.Set;
/**
*/
@Command(
name = "insert-segment-to-db",
description = "insert a segment into metadata storage"
)
public class InsertSegment extends GuiceRunnable
{
private static final Logger log = new Logger(InsertSegment.class);
@Option(name = "--workingDir", description = "The directory path where segments are stored. This tool will recursively look for segments underneath this directory and insert/update these segments in metdata storage.", required = true)
private String workingDirPath;
@Option(name = "--updateDescriptor", description = "if set to true, this tool will update loadSpec field in descriptor.json (partitionNum_descriptor.json for HDFS data storage) if the path in loadSpec is different from where desciptor.json (partitionNum_descriptor.json for HDFS data storage) was found. Default value is true", required = false)
private String updateDescriptor;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
public InsertSegment()
{
super(log);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.of(
// It's unknown if those modules are required in InsertSegment.
// Maybe some of those modules could be removed.
// See https://github.com/apache/incubator-druid/pull/4429#discussion_r123603498
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
binder -> JsonConfigProvider.bindInstance(
binder,
Key.get(DruidNode.class, Self.class),
new DruidNode("tools", "localhost", false, -1, null, true, false)
)
);
}
@Override
public void run()
{
final Injector injector = makeInjector();
indexerMetadataStorageCoordinator = injector.getInstance(IndexerMetadataStorageCoordinator.class);
final DataSegmentFinder dataSegmentFinder = injector.getInstance(DataSegmentFinder.class);
log.info("Start searching segments under [%s]", workingDirPath);
Set<DataSegment> segments = null;
try {
segments = dataSegmentFinder.findSegments(workingDirPath, Boolean.valueOf(updateDescriptor));
}
catch (SegmentLoadingException e) {
Throwables.propagate(e);
}
log.info(
"Done searching segments under [%s], [%d] segments were found",
workingDirPath,
segments.size()
);
try {
insertSegments(segments);
}
catch (IOException e) {
Throwables.propagate(e);
}
log.info("Done processing [%d] segments", segments.size());
}
private void insertSegments(final Set<DataSegment> segments) throws IOException
{
final Set<DataSegment> segmentsInserted = indexerMetadataStorageCoordinator.announceHistoricalSegments(segments);
for (DataSegment dataSegment : segmentsInserted) {
log.info("Sucessfully inserted Segment [%s] into metadata storage", dataSegment.getId());
}
final Set<DataSegment> segmentsAlreadyExist = Sets.difference(segments, segmentsInserted);
if (!segmentsAlreadyExist.isEmpty()) {
for (DataSegment dataSegment : segmentsAlreadyExist) {
log.info("Segment [%s] already exists in metadata storage, updating the payload", dataSegment.getId());
}
indexerMetadataStorageCoordinator.updateSegmentMetadata(segmentsAlreadyExist);
}
}
}

View File

@ -78,7 +78,6 @@ public class Main
DruidJsonValidator.class,
PullDependencies.class,
CreateTables.class,
InsertSegment.class,
DumpSegment.class,
ResetCluster.class,
ValidateSegments.class