Add tool for migrating from local deep storage/Derby metadata (#7598)

* Add tool for migrating from local deep storage/Derby metadata

* Split deep storage and metadata migration docs

* Support import into Derby

* Fix create tables cmd

* Fix create tables cmd

* Fix commands

* PR comment

* Add -p
This commit is contained in:
Jonathan Wei 2019-05-06 23:39:40 -07:00 committed by GitHub
parent b3c7463059
commit dadf6a2f11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 957 additions and 1 deletions

View File

@ -58,6 +58,14 @@ public interface MetadataStorageConnector
throw new UnsupportedOperationException("compareAndSwap is not implemented.");
}
default void exportTable(
String tableName,
String outputPath
)
{
throw new UnsupportedOperationException("exportTable is not implemented.");
}
void createDataSourceTable();
void createPendingSegmentsTable();

View File

@ -114,6 +114,17 @@ public interface DataSegmentPusher
);
}
static String getDefaultStorageDirWithExistingUniquePath(DataSegment segment, String uniquePath)
{
return JOINER.join(
segment.getDataSource(),
StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
segment.getVersion(),
segment.getShardSpec().getPartitionNum(),
uniquePath
);
}
static String generateUniquePath()
{
return UUID.randomUUID().toString();

View File

@ -0,0 +1,66 @@
---
layout: doc_page
title: "Deep Storage Migration"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# Deep Storage Migration
If you have been running an evaluation Druid cluster using local deep storage and wish to migrate to a
more production-capable deep storage system such as S3 or HDFS, this document describes the necessary steps.
Migration of deep storage involves the following steps at a high level:
- Copying segments from local deep storage to the new deep storage
- Exporting Druid's segments table from metadata
- Rewriting the load specs in the exported segment data to reflect the new deep storage location
- Reimporting the edited segments into metadata
## Shut down cluster services
To ensure a clean migration, shut down the non-coordinator services to ensure that metadata state will not
change as you do the migration.
When migrating from Derby, the coordinator processes will still need to be up initially, as they host the Derby database.
## Copy segments from old deep storage to new deep storage.
Before migrating, you will need to copy your old segments to the new deep storage.
For information on what path structure to use in the new deep storage, please see [deep storage migration options](../operations/export-metadata.html#deep-storage-migration).
## Export segments with rewritten load specs
Druid provides an [Export Metadata Tool](../operations/export-metadata.html) for exporting metadata from Derby into CSV files
which can then be reimported.
By setting [deep storage migration options](../operations/export-metadata.html#deep-storage-migration), the `export-metadata` tool will export CSV files where the segment load specs have been rewritten to load from your new deep storage location.
Run the `export-metadata` tool on your existing cluster, using the migration options appropriate for your new deep storage location, and save the CSV files it generates. After a successful export, you can shut down the coordinator.
### Import metadata
After generating the CSV exports with the modified segment data, you can reimport the contents of the Druid segments table from the generated CSVs.
Please refer to [import commands](../operations/export-metadata.html#importing-metadata) for examples. Only the `druid_segments` table needs to be imported.
### Restart cluster
After importing the segment table successfully, you can now restart your cluster.

View File

@ -0,0 +1,201 @@
---
layout: doc_page
title: "Export Metadata Tool"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# Export Metadata Tool
Druid includes an `export-metadata` tool for assisting with migration of cluster metadata and deep storage.
This tool exports the contents of the following Druid metadata tables:
- segments
- rules
- config
- datasource
- supervisors
Additionally, the tool can rewrite the local deep storage location descriptors in the rows of the segments table
to point to new deep storage locations (S3, HDFS, and local rewrite paths are supported).
The tool has the following limitations:
- Only exporting from Derby metadata is currently supported
- If rewriting load specs for deep storage migration, only migrating from local deep storage is currently supported.
## `export-metadata` Options
The `export-metadata` tool provides the following options:
### Connection Properties
`--connectURI`: The URI of the Derby database, e.g. `jdbc:derby://localhost:1527/var/druid/metadata.db;create=true`
`--user`: Username
`--password`: Password
`--base`: corresponds to the value of `druid.metadata.storage.tables.base` in the configuration, `druid` by default.
### Output Path
`--output-path`, `-o`: The output directory of the tool. CSV files for the Druid segments, rules, config, datasource, and supervisors tables will be written to this directory.
### Export Format Options
`--use-hex-blobs`, `-x`: If set, export BLOB payload columns as hexadecimal strings. This needs to be set if importing back into Derby. Default is false.
`--booleans-as-strings`, `-t`: If set, write boolean values as "true" or "false" instead of "1" and "0". This needs to be set if importing back into Derby. Default is false.
### Deep Storage Migration
#### Migration to S3 Deep Storage
By setting the options below, the tool will rewrite the segment load specs to point to a new S3 deep storage location.
This helps users migrate segments stored in local deep storage to S3.
`--s3bucket`, `-b`: The S3 bucket that will hold the migrated segments
`--s3baseKey`, `-k`: The base S3 key where the migrated segments will be stored
When copying the local deep storage segments to S3, the rewrite performed by this tool requires that the directory structure of the segments be unchanged.
For example, if the cluster had the following local deep storage configuration:
```
druid.storage.type=local
druid.storage.storageDirectory=/druid/segments
```
If the target S3 bucket was `migration`, with a base key of `example`, the contents of `s3://migration/example/` must be identical to that of `/druid/segments` on the old local filesystem.
#### Migration to HDFS Deep Storage
By setting the options below, the tool will rewrite the segment load specs to point to a new HDFS deep storage location.
This helps users migrate segments stored in local deep storage to HDFS.
`--hadoopStorageDirectory`, `-h`: The HDFS path that will hold the migrated segments
When copying the local deep storage segments to HDFS, the rewrite performed by this tool requires that the directory structure of the segments be unchanged, with the exception of directory names containing colons (`:`).
For example, if the cluster had the following local deep storage configuration:
```
druid.storage.type=local
druid.storage.storageDirectory=/druid/segments
```
If the target hadoopStorageDirectory was `/migration/example`, the contents of `hdfs:///migration/example/` must be identical to that of `/druid/segments` on the old local filesystem.
Additionally, the segments paths in local deep storage contain colons(`:`) in their names, e.g.:
`wikipedia/2016-06-27T02:00:00.000Z_2016-06-27T03:00:00.000Z/2019-05-03T21:57:15.950Z/1/index.zip`
HDFS cannot store files containing colons, and this tool expects the colons to be replaced with underscores (`_`) in HDFS.
In this example, the `wikipedia` segment above under `/druid/segments` in local deep storage would need to be migrated to HDFS under `hdfs:///migration/example/` with the following path:
`wikipedia/2016-06-27T02_00_00.000Z_2016-06-27T03_00_00.000Z/2019-05-03T21_57_15.950Z/1/index.zip`
#### Migration to New Local Deep Storage Path
By setting the options below, the tool will rewrite the segment load specs to point to a new local deep storage location.
This helps users migrate segments stored in local deep storage to a new path (e.g., a new NFS mount).
`--newLocalPath`, `-n`: The new path on the local filesystem that will hold the migrated segments
When copying the local deep storage segments to a new path, the rewrite performed by this tool requires that the directory structure of the segments be unchanged.
For example, if the cluster had the following local deep storage configuration:
```
druid.storage.type=local
druid.storage.storageDirectory=/druid/segments
```
If the new path was `/migration/example`, the contents of `/migration/example/` must be identical to that of `/druid/segments` on the local filesystem.
## Running the tool
To use the tool, you can run the following from the root of the Druid package:
```bash
cd ${DRUID_ROOT}
mkdir -p /tmp/csv
java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[] org.apache.druid.cli.Main tools export-metadata --connectURI "jdbc:derby://localhost:1527/var/druid/metadata.db;" -o /tmp/csv
```
In the example command above:
- `lib` is the the Druid lib directory
- `extensions` is the Druid extensions directory
- `/tmp/csv` is the output directory. Please make sure that this directory exists.
## Importing Metadata
After running the tool, the output directory will contain `<table-name>_raw.csv` and `<table-name>.csv` files.
The `<table-name>_raw.csv` files are intermediate files used by the tool, containing the table data as exported by Derby without modification.
The `<table-name>.csv` files are used for import into another database such as MySQL and PostgreSQL and have any configured deep storage location rewrites applied.
Example import commands for Derby, MySQL, and PostgreSQL are shown below.
These example import commands expect `/tmp/csv` and its contents to be accessible from the server. For other options, such as importing from the client filesystem, please refer to the database's documentation.
### Derby
```sql
CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_SEGMENTS','/tmp/csv/druid_segments.csv',',','"',null,0);
CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_RULES','/tmp/csv/druid_rules.csv',',','"',null,0);
CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_CONFIG','/tmp/csv/druid_config.csv',',','"',null,0);
CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_DATASOURCE','/tmp/csv/druid_dataSource.csv',',','"',null,0);
CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_SUPERVISORS','/tmp/csv/druid_supervisors.csv',',','"',null,0);
```
### MySQL
```sql
LOAD DATA INFILE '/tmp/csv/druid_segments.csv' INTO TABLE druid_segments FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (id,dataSource,created_date,start,end,partitioned,version,used,payload); SHOW WARNINGS;
LOAD DATA INFILE '/tmp/csv/druid_rules.csv' INTO TABLE druid_rules FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (id,dataSource,version,payload); SHOW WARNINGS;
LOAD DATA INFILE '/tmp/csv/druid_config.csv' INTO TABLE druid_config FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (name,payload); SHOW WARNINGS;
LOAD DATA INFILE '/tmp/csv/druid_dataSource.csv' INTO TABLE druid_dataSource FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (dataSource,created_date,commit_metadata_payload,commit_metadata_sha1); SHOW WARNINGS;
LOAD DATA INFILE '/tmp/csv/druid_supervisors.csv' INTO TABLE druid_supervisors FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (id,spec_id,created_date,payload); SHOW WARNINGS;
```
### PostgreSQL
```sql
COPY druid_segments(id,dataSource,created_date,start,"end",partitioned,version,used,payload) FROM '/tmp/csv/druid_segments.csv' DELIMITER ',' CSV;
COPY druid_rules(id,dataSource,version,payload) FROM '/tmp/csv/druid_rules.csv' DELIMITER ',' CSV;
COPY druid_config(name,payload) FROM '/tmp/csv/druid_config.csv' DELIMITER ',' CSV;
COPY druid_dataSource(dataSource,created_date,commit_metadata_payload,commit_metadata_sha1) FROM '/tmp/csv/druid_dataSource.csv' DELIMITER ',' CSV;
COPY druid_supervisors(id,spec_id,created_date,payload) FROM '/tmp/csv/druid_supervisors.csv' DELIMITER ',' CSV;
```

View File

@ -0,0 +1,92 @@
---
layout: doc_page
title: "Metadata Migration"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# Metadata Migration
If you have been running an evaluation Druid cluster using the built-in Derby metadata storage and wish to migrate to a
more production-capable metadata store such as MySQL or PostgreSQL, this document describes the necessary steps.
## Shut down cluster services
To ensure a clean migration, shut down the non-coordinator services to ensure that metadata state will not
change as you do the migration.
When migrating from Derby, the coordinator processes will still need to be up initially, as they host the Derby database.
## Exporting metadata
Druid provides an [Export Metadata Tool](../operations/export-metadata.html) for exporting metadata from Derby into CSV files
which can then be imported into your new metadata store.
The tool also provides options for rewriting the deep storage locations of segments; this is useful
for [deep storage migration](../operations/deep-storage-migration.html).
Run the `export-metadata` tool on your existing cluster, and save the CSV files it generates. After a successful export, you can shut down the coordinator.
## Initializing the new metadata store
### Create database
Before importing the existing cluster metadata, you will need to set up the new metadata store.
The [MySQL extension](../development/extensions-core/mysql.html) and [PostgreSQL extension](../development/extensions-core/postgresql.html) docs have instructions for initial database setup.
### Update configuration
Update your Druid runtime properties with the new metadata configuration.
### Create Druid tables
Druid provides a `metadata-init` tool for creating Druid's metadata tables. After initializing the Druid database, you can run the commands shown below from the root of the Druid package to initialize the tables.
In the example commands below:
- `lib` is the the Druid lib directory
- `extensions` is the Druid extensions directory
- `base` corresponds to the value of `druid.metadata.storage.tables.base` in the configuration, `druid` by default.
- The `--connectURI` parameter corresponds to the value of `druid.metadata.storage.connector.connectURI`.
- The `--user` parameter corresponds to the value of `druid.metadata.storage.connector.user`.
- The `--password` parameter corresponds to the value of `druid.metadata.storage.connector.password`.
#### MySQL
```bash
cd ${DRUID_ROOT}
java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[\"mysql-metadata-storage\"] -Ddruid.metadata.storage.type=mysql org.apache.druid.cli.Main tools metadata-init --connectURI="<mysql-uri>" --user <user> --password <pass> --base druid
```
#### PostgreSQL
```bash
cd ${DRUID_ROOT}
java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[\"postgresql-metadata-storage\"] -Ddruid.metadata.storage.type=postgresql org.apache.druid.cli.Main tools metadata-init --connectURI="<postgresql-uri>" --user <user> --password <pass> --base druid
```
### Import metadata
After initializing the tables, please refer to the [import commands](../operations/export-metadata.html#importing-metadata) for your target database.
### Restart cluster
After importing the metadata successfully, you can now restart your cluster.

View File

@ -33,6 +33,7 @@ import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
@ManageLifecycle
public class DerbyConnector extends SQLMetadataConnector
@ -113,6 +114,31 @@ public class DerbyConnector extends SQLMetadataConnector
return "VALUES 1";
}
@Override
public void exportTable(
String tableName,
String outputPath
)
{
retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
handle.createStatement(
StringUtils.format(
"CALL SYSCS_UTIL.SYSCS_EXPORT_TABLE (null, '%s', '%s', null, null, null)",
tableName,
outputPath
)
).execute();
return null;
}
}
);
}
@LifecycleStart
public void start()
{

View File

@ -0,0 +1,551 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.opencsv.CSVParser;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.server.DruidNode;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import javax.xml.bind.DatatypeConverter;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Command(
name = "export-metadata",
description = "Exports the contents of a Druid Derby metadata store to CSV files to assist with cluster migration. This tool also provides the ability to rewrite segment locations in the Derby metadata to assist with deep storage migration."
)
public class ExportMetadata extends GuiceRunnable
{
@Option(name = "--connectURI", description = "Database JDBC connection string", required = true)
private String connectURI;
@Option(name = "--user", description = "Database username")
private String user = null;
@Option(name = "--password", description = "Database password")
private String password = null;
@Option(name = "--base", description = "Base table name")
private String base = "druid";
@Option(
name = {"-b", "--s3bucket"},
title = "s3bucket",
description = "S3 bucket of the migrated segments",
required = false)
public String s3Bucket = null;
@Option(
name = {"-k", "--s3baseKey"},
title = "s3baseKey",
description = "S3 baseKey of the migrated segments",
required = false)
public String s3baseKey = null;
@Option(
name = {"-h", "--hadoopStorageDirectory"},
title = "hadoopStorageDirectory",
description = "hadoopStorageDirectory of the migrated segments",
required = false)
public String hadoopStorageDirectory = null;
@Option(
name = {"-n", "--newLocalPath"},
title = "newLocalPath",
description = "newLocalPath of the migrated segments",
required = false)
public String newLocalPath = null;
@Option(
name = {"-o", "--output-path"},
title = "output-path",
description = "CSV output path",
required = false)
public String outputPath = null;
@Option(
name = {"-x", "--use-hex-blobs"},
title = "use-hex-blobs",
description = "Write BLOB payloads as hex strings",
required = false)
public boolean useHexBlobs = false;
@Option(
name = {"-t", "--booleans-as-strings"},
title = "booleans-as-strings",
description = "Write boolean values as true/false strings instead of 1/0",
required = false)
public boolean booleansAsStrings = false;
private static final Logger log = new Logger(ExportMetadata.class);
private static final CSVParser parser = new CSVParser();
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
public ExportMetadata()
{
super(log);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.of(
// This area is copied from CreateTables.
// It's unknown why those modules are required in CreateTables, and if all of those modules are required or not.
// Maybe some of those modules could be removed.
// See https://github.com/apache/incubator-druid/pull/4429#discussion_r123602930
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
binder -> {
JsonConfigProvider.bindInstance(
binder,
Key.get(MetadataStorageConnectorConfig.class),
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return connectURI;
}
@Override
public String getUser()
{
return user;
}
@Override
public String getPassword()
{
return password;
}
}
);
JsonConfigProvider.bindInstance(
binder,
Key.get(MetadataStorageTablesConfig.class),
MetadataStorageTablesConfig.fromBase(base)
);
JsonConfigProvider.bindInstance(
binder,
Key.get(DruidNode.class, Self.class),
new DruidNode("tools", "localhost", false, -1, null, true, false)
);
}
);
}
@Override
public void run()
{
InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(ObjectMapper.class, jsonMapper);
injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
jsonMapper.setInjectableValues(injectableValues);
if (hadoopStorageDirectory != null && newLocalPath != null) {
throw new IllegalArgumentException(
"Only one of s3Bucket, hadoopStorageDirectory, and newLocalPath can be set."
);
}
if (s3Bucket != null && (hadoopStorageDirectory != null || newLocalPath != null)) {
throw new IllegalArgumentException(
"Only one of s3Bucket, hadoopStorageDirectory, and newLocalPath can be set."
);
}
if (s3Bucket != null && s3baseKey == null) {
throw new IllegalArgumentException("s3baseKey must be set if s3Bucket is set.");
}
final Injector injector = makeInjector();
SQLMetadataConnector dbConnector = injector.getInstance(SQLMetadataConnector.class);
MetadataStorageTablesConfig metadataStorageTablesConfig = injector.getInstance(MetadataStorageTablesConfig.class);
// We export a raw CSV first, and then apply some conversions for easier imports:
// Boolean strings are rewritten as 1 and 0
// hexadecimal BLOB columns are rewritten with rewriteHexPayloadAsEscapedJson()
log.info("Exporting datasource table: " + metadataStorageTablesConfig.getDataSourceTable());
exportTable(dbConnector, metadataStorageTablesConfig.getDataSourceTable(), true);
rewriteDatasourceExport(metadataStorageTablesConfig.getDataSourceTable());
log.info("Exporting segments table: " + metadataStorageTablesConfig.getSegmentsTable());
exportTable(dbConnector, metadataStorageTablesConfig.getSegmentsTable(), true);
rewriteSegmentsExport(metadataStorageTablesConfig.getSegmentsTable());
log.info("Exporting rules table: " + metadataStorageTablesConfig.getRulesTable());
exportTable(dbConnector, metadataStorageTablesConfig.getRulesTable(), true);
rewriteRulesExport(metadataStorageTablesConfig.getRulesTable());
log.info("Exporting config table: " + metadataStorageTablesConfig.getConfigTable());
exportTable(dbConnector, metadataStorageTablesConfig.getConfigTable(), true);
rewriteConfigExport(metadataStorageTablesConfig.getConfigTable());
log.info("Exporting supervisor table: " + metadataStorageTablesConfig.getSupervisorTable());
exportTable(dbConnector, metadataStorageTablesConfig.getSupervisorTable(), true);
rewriteSupervisorExport(metadataStorageTablesConfig.getSupervisorTable());
}
private void exportTable(
SQLMetadataConnector dbConnector,
String tableName,
boolean withRawFilename
)
{
String pathFormatString;
if (withRawFilename) {
pathFormatString = "%s/%s_raw.csv";
} else {
pathFormatString = "%s/%s.csv";
}
dbConnector.exportTable(
StringUtils.toUpperCase(tableName),
StringUtils.format(pathFormatString, outputPath, tableName)
);
}
private void rewriteDatasourceExport(
String datasourceTableName
)
{
String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, datasourceTableName);
String outFile = StringUtils.format("%s/%s.csv", outputPath, datasourceTableName);
try (
BufferedReader reader = new BufferedReader(
new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8)
);
OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8);
) {
String line;
while ((line = reader.readLine()) != null) {
String[] parsed = parser.parseLine(line);
StringBuilder newLineBuilder = new StringBuilder();
newLineBuilder.append(parsed[0]).append(","); //dataSource
newLineBuilder.append(parsed[1]).append(","); //created_date
newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[2])).append(","); //commit_metadata_payload
newLineBuilder.append(parsed[3]); //commit_metadata_sha1
newLineBuilder.append("\n");
writer.write(newLineBuilder.toString());
}
}
catch (IOException ioex) {
throw new RuntimeException(ioex);
}
}
private void rewriteRulesExport(
String rulesTableName
)
{
String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, rulesTableName);
String outFile = StringUtils.format("%s/%s.csv", outputPath, rulesTableName);
try (
BufferedReader reader = new BufferedReader(
new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8)
);
OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8);
) {
String line;
while ((line = reader.readLine()) != null) {
String[] parsed = parser.parseLine(line);
StringBuilder newLineBuilder = new StringBuilder();
newLineBuilder.append(parsed[0]).append(","); //id
newLineBuilder.append(parsed[1]).append(","); //dataSource
newLineBuilder.append(parsed[2]).append(","); //version
newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[3])); //payload
newLineBuilder.append("\n");
writer.write(newLineBuilder.toString());
}
}
catch (IOException ioex) {
throw new RuntimeException(ioex);
}
}
private void rewriteConfigExport(
String configTableName
)
{
String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, configTableName);
String outFile = StringUtils.format("%s/%s.csv", outputPath, configTableName);
try (
BufferedReader reader = new BufferedReader(
new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8)
);
OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8);
) {
String line;
while ((line = reader.readLine()) != null) {
String[] parsed = parser.parseLine(line);
StringBuilder newLineBuilder = new StringBuilder();
newLineBuilder.append(parsed[0]).append(","); //name
newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[1])); //payload
newLineBuilder.append("\n");
writer.write(newLineBuilder.toString());
}
}
catch (IOException ioex) {
throw new RuntimeException(ioex);
}
}
private void rewriteSupervisorExport(
String supervisorTableName
)
{
String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, supervisorTableName);
String outFile = StringUtils.format("%s/%s.csv", outputPath, supervisorTableName);
try (
BufferedReader reader = new BufferedReader(
new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8)
);
OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8);
) {
String line;
while ((line = reader.readLine()) != null) {
String[] parsed = parser.parseLine(line);
StringBuilder newLineBuilder = new StringBuilder();
newLineBuilder.append(parsed[0]).append(","); //id
newLineBuilder.append(parsed[1]).append(","); //spec_id
newLineBuilder.append(parsed[2]).append(","); //created_date
newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[3])); //payload
newLineBuilder.append("\n");
writer.write(newLineBuilder.toString());
}
}
catch (IOException ioex) {
throw new RuntimeException(ioex);
}
}
private void rewriteSegmentsExport(
String segmentsTableName
)
{
String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, segmentsTableName);
String outFile = StringUtils.format("%s/%s.csv", outputPath, segmentsTableName);
try (
BufferedReader reader = new BufferedReader(
new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8)
);
OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8);
) {
String line;
while ((line = reader.readLine()) != null) {
String[] parsed = parser.parseLine(line);
StringBuilder newLineBuilder = new StringBuilder();
newLineBuilder.append(parsed[0]).append(","); //id
newLineBuilder.append(parsed[1]).append(","); //dataSource
newLineBuilder.append(parsed[2]).append(","); //created_date
newLineBuilder.append(parsed[3]).append(","); //start
newLineBuilder.append(parsed[4]).append(","); //end
newLineBuilder.append(convertBooleanString(parsed[5])).append(","); //partitioned
newLineBuilder.append(parsed[6]).append(","); //version
newLineBuilder.append(convertBooleanString(parsed[7])).append(","); //used
if (s3Bucket != null || hadoopStorageDirectory != null || newLocalPath != null) {
newLineBuilder.append(makePayloadWithConvertedLoadSpec(parsed[8]));
} else {
newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[8])); //payload
}
newLineBuilder.append("\n");
writer.write(newLineBuilder.toString());
}
}
catch (IOException ioex) {
throw new RuntimeException(ioex);
}
}
/**
* Returns a new load spec in escaped JSON form, with the new deep storage location if configured.
*/
private String makePayloadWithConvertedLoadSpec(
String payload
) throws IOException
{
DataSegment segment = jsonMapper.readValue(DatatypeConverter.parseHexBinary(payload), DataSegment.class);
String uniqueId = getUniqueIDFromLocalLoadSpec(segment.getLoadSpec());
String segmentPath = DataSegmentPusher.getDefaultStorageDirWithExistingUniquePath(segment, uniqueId);
Map<String, Object> newLoadSpec = null;
if (s3Bucket != null) {
newLoadSpec = makeS3LoadSpec(segmentPath);
} else if (hadoopStorageDirectory != null) {
newLoadSpec = makeHDFSLoadSpec(segmentPath);
} else if (newLocalPath != null) {
newLoadSpec = makeLocalLoadSpec(segmentPath);
}
if (newLoadSpec != null) {
segment = new DataSegment(
segment.getDataSource(),
segment.getInterval(),
segment.getVersion(),
newLoadSpec,
segment.getDimensions(),
segment.getMetrics(),
segment.getShardSpec(),
segment.getBinaryVersion(),
segment.getSize()
);
}
String serialized = jsonMapper.writeValueAsString(segment);
if (useHexBlobs) {
return DatatypeConverter.printHexBinary(StringUtils.toUtf8(serialized));
} else {
return escapeJSONForCSV(serialized);
}
}
/**
* Derby's export tool writes BLOB columns as a hexadecimal string:
* https://db.apache.org/derby/docs/10.9/adminguide/cadminimportlobs.html
*
* Decodes the hex string and escapes the decoded JSON.
*/
private String rewriteHexPayloadAsEscapedJson(
String payload
)
{
if (useHexBlobs) {
return payload;
}
String json = StringUtils.fromUtf8(DatatypeConverter.parseHexBinary(payload));
return escapeJSONForCSV(json);
}
private String convertBooleanString(String booleanString)
{
if (booleansAsStrings) {
return booleanString;
} else {
return "true".equals(booleanString) ? "1" : "0";
}
}
private String escapeJSONForCSV(String json)
{
return "\"" + StringUtils.replace(json, "\"", "\"\"") + "\"";
}
private Map<String, Object> makeS3LoadSpec(
String segmentPath
)
{
return ImmutableMap.of(
"type", "s3_zip",
"bucket", s3Bucket,
"key", StringUtils.format("%s/%s/index.zip", s3baseKey, segmentPath)
);
}
/**
* Makes an HDFS spec, replacing colons with underscores. HDFS doesn't support colons in filenames.
*/
private Map<String, Object> makeHDFSLoadSpec(
String segmentPath
)
{
return ImmutableMap.of(
"type", "hdfs",
"path", StringUtils.format("%s/%s/index.zip", hadoopStorageDirectory, segmentPath.replace(':', '_'))
);
}
private Map<String, Object> makeLocalLoadSpec(
String segmentPath
)
{
return ImmutableMap.of(
"type", "local",
"path", StringUtils.format("%s/%s/index.zip", newLocalPath, segmentPath)
);
}
/**
* Looks for an optional unique path component in the segment path.
* The unique path is used for segments created by realtime indexing tasks like Kafka.
*/
@Nullable
private String getUniqueIDFromLocalLoadSpec(
Map<String, Object> localLoadSpec
)
{
String[] splits = ((String) localLoadSpec.get("path")).split("/");
if (splits.length < 2) {
return null;
}
String maybeUUID = splits[splits.length - 2];
try {
UUID.fromString(maybeUUID);
return maybeUUID;
}
catch (IllegalArgumentException iae) {
return null;
}
}
}

View File

@ -80,7 +80,8 @@ public class Main
CreateTables.class,
DumpSegment.class,
ResetCluster.class,
ValidateSegments.class
ValidateSegments.class,
ExportMetadata.class
);
builder.withGroup("tools")
.withDescription("Various tools for working with Druid")