diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageConnector.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageConnector.java index bde1d49c06a..7f6df4213a2 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageConnector.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageConnector.java @@ -58,6 +58,14 @@ public interface MetadataStorageConnector throw new UnsupportedOperationException("compareAndSwap is not implemented."); } + default void exportTable( + String tableName, + String outputPath + ) + { + throw new UnsupportedOperationException("exportTable is not implemented."); + } + void createDataSourceTable(); void createPendingSegmentsTable(); diff --git a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java index a1da4f89ec3..7e9eb9234b7 100644 --- a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java +++ b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java @@ -114,6 +114,17 @@ public interface DataSegmentPusher ); } + static String getDefaultStorageDirWithExistingUniquePath(DataSegment segment, String uniquePath) + { + return JOINER.join( + segment.getDataSource(), + StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()), + segment.getVersion(), + segment.getShardSpec().getPartitionNum(), + uniquePath + ); + } + static String generateUniquePath() { return UUID.randomUUID().toString(); diff --git a/docs/content/operations/deep-storage-migration.md b/docs/content/operations/deep-storage-migration.md new file mode 100644 index 00000000000..3fc61e7c0d2 --- /dev/null +++ b/docs/content/operations/deep-storage-migration.md @@ -0,0 +1,66 @@ +--- +layout: doc_page +title: "Deep Storage Migration" +--- + + + +# Deep Storage Migration + +If you have been running an evaluation Druid cluster using local deep storage and wish to migrate to a +more production-capable deep storage system such as S3 or HDFS, this document describes the necessary steps. + +Migration of deep storage involves the following steps at a high level: +- Copying segments from local deep storage to the new deep storage +- Exporting Druid's segments table from metadata +- Rewriting the load specs in the exported segment data to reflect the new deep storage location +- Reimporting the edited segments into metadata + +## Shut down cluster services + +To ensure a clean migration, shut down the non-coordinator services to ensure that metadata state will not +change as you do the migration. + +When migrating from Derby, the coordinator processes will still need to be up initially, as they host the Derby database. + +## Copy segments from old deep storage to new deep storage. + +Before migrating, you will need to copy your old segments to the new deep storage. + +For information on what path structure to use in the new deep storage, please see [deep storage migration options](../operations/export-metadata.html#deep-storage-migration). + +## Export segments with rewritten load specs + +Druid provides an [Export Metadata Tool](../operations/export-metadata.html) for exporting metadata from Derby into CSV files +which can then be reimported. + +By setting [deep storage migration options](../operations/export-metadata.html#deep-storage-migration), the `export-metadata` tool will export CSV files where the segment load specs have been rewritten to load from your new deep storage location. + +Run the `export-metadata` tool on your existing cluster, using the migration options appropriate for your new deep storage location, and save the CSV files it generates. After a successful export, you can shut down the coordinator. + +### Import metadata + +After generating the CSV exports with the modified segment data, you can reimport the contents of the Druid segments table from the generated CSVs. + +Please refer to [import commands](../operations/export-metadata.html#importing-metadata) for examples. Only the `druid_segments` table needs to be imported. + +### Restart cluster + +After importing the segment table successfully, you can now restart your cluster. diff --git a/docs/content/operations/export-metadata.md b/docs/content/operations/export-metadata.md new file mode 100644 index 00000000000..11c0f76e4d8 --- /dev/null +++ b/docs/content/operations/export-metadata.md @@ -0,0 +1,201 @@ +--- +layout: doc_page +title: "Export Metadata Tool" +--- + + + +# Export Metadata Tool + +Druid includes an `export-metadata` tool for assisting with migration of cluster metadata and deep storage. + +This tool exports the contents of the following Druid metadata tables: +- segments +- rules +- config +- datasource +- supervisors + +Additionally, the tool can rewrite the local deep storage location descriptors in the rows of the segments table +to point to new deep storage locations (S3, HDFS, and local rewrite paths are supported). + +The tool has the following limitations: +- Only exporting from Derby metadata is currently supported +- If rewriting load specs for deep storage migration, only migrating from local deep storage is currently supported. + +## `export-metadata` Options + +The `export-metadata` tool provides the following options: + +### Connection Properties + +`--connectURI`: The URI of the Derby database, e.g. `jdbc:derby://localhost:1527/var/druid/metadata.db;create=true` +`--user`: Username +`--password`: Password +`--base`: corresponds to the value of `druid.metadata.storage.tables.base` in the configuration, `druid` by default. + +### Output Path + +`--output-path`, `-o`: The output directory of the tool. CSV files for the Druid segments, rules, config, datasource, and supervisors tables will be written to this directory. + +### Export Format Options + +`--use-hex-blobs`, `-x`: If set, export BLOB payload columns as hexadecimal strings. This needs to be set if importing back into Derby. Default is false. + +`--booleans-as-strings`, `-t`: If set, write boolean values as "true" or "false" instead of "1" and "0". This needs to be set if importing back into Derby. Default is false. + +### Deep Storage Migration + +#### Migration to S3 Deep Storage + +By setting the options below, the tool will rewrite the segment load specs to point to a new S3 deep storage location. + +This helps users migrate segments stored in local deep storage to S3. + +`--s3bucket`, `-b`: The S3 bucket that will hold the migrated segments +`--s3baseKey`, `-k`: The base S3 key where the migrated segments will be stored + +When copying the local deep storage segments to S3, the rewrite performed by this tool requires that the directory structure of the segments be unchanged. + +For example, if the cluster had the following local deep storage configuration: + +``` +druid.storage.type=local +druid.storage.storageDirectory=/druid/segments +``` + +If the target S3 bucket was `migration`, with a base key of `example`, the contents of `s3://migration/example/` must be identical to that of `/druid/segments` on the old local filesystem. + +#### Migration to HDFS Deep Storage + +By setting the options below, the tool will rewrite the segment load specs to point to a new HDFS deep storage location. + +This helps users migrate segments stored in local deep storage to HDFS. + +`--hadoopStorageDirectory`, `-h`: The HDFS path that will hold the migrated segments + +When copying the local deep storage segments to HDFS, the rewrite performed by this tool requires that the directory structure of the segments be unchanged, with the exception of directory names containing colons (`:`). + +For example, if the cluster had the following local deep storage configuration: + +``` +druid.storage.type=local +druid.storage.storageDirectory=/druid/segments +``` + +If the target hadoopStorageDirectory was `/migration/example`, the contents of `hdfs:///migration/example/` must be identical to that of `/druid/segments` on the old local filesystem. + +Additionally, the segments paths in local deep storage contain colons(`:`) in their names, e.g.: + +`wikipedia/2016-06-27T02:00:00.000Z_2016-06-27T03:00:00.000Z/2019-05-03T21:57:15.950Z/1/index.zip` + +HDFS cannot store files containing colons, and this tool expects the colons to be replaced with underscores (`_`) in HDFS. + +In this example, the `wikipedia` segment above under `/druid/segments` in local deep storage would need to be migrated to HDFS under `hdfs:///migration/example/` with the following path: + +`wikipedia/2016-06-27T02_00_00.000Z_2016-06-27T03_00_00.000Z/2019-05-03T21_57_15.950Z/1/index.zip` + +#### Migration to New Local Deep Storage Path + +By setting the options below, the tool will rewrite the segment load specs to point to a new local deep storage location. + +This helps users migrate segments stored in local deep storage to a new path (e.g., a new NFS mount). + +`--newLocalPath`, `-n`: The new path on the local filesystem that will hold the migrated segments + +When copying the local deep storage segments to a new path, the rewrite performed by this tool requires that the directory structure of the segments be unchanged. + +For example, if the cluster had the following local deep storage configuration: + +``` +druid.storage.type=local +druid.storage.storageDirectory=/druid/segments +``` + +If the new path was `/migration/example`, the contents of `/migration/example/` must be identical to that of `/druid/segments` on the local filesystem. + +## Running the tool + +To use the tool, you can run the following from the root of the Druid package: + +```bash +cd ${DRUID_ROOT} +mkdir -p /tmp/csv +java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[] org.apache.druid.cli.Main tools export-metadata --connectURI "jdbc:derby://localhost:1527/var/druid/metadata.db;" -o /tmp/csv +``` + +In the example command above: +- `lib` is the the Druid lib directory +- `extensions` is the Druid extensions directory +- `/tmp/csv` is the output directory. Please make sure that this directory exists. + +## Importing Metadata + +After running the tool, the output directory will contain `_raw.csv` and `.csv` files. + +The `_raw.csv` files are intermediate files used by the tool, containing the table data as exported by Derby without modification. + +The `.csv` files are used for import into another database such as MySQL and PostgreSQL and have any configured deep storage location rewrites applied. + +Example import commands for Derby, MySQL, and PostgreSQL are shown below. + +These example import commands expect `/tmp/csv` and its contents to be accessible from the server. For other options, such as importing from the client filesystem, please refer to the database's documentation. + +### Derby + +```sql +CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_SEGMENTS','/tmp/csv/druid_segments.csv',',','"',null,0); + +CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_RULES','/tmp/csv/druid_rules.csv',',','"',null,0); + +CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_CONFIG','/tmp/csv/druid_config.csv',',','"',null,0); + +CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_DATASOURCE','/tmp/csv/druid_dataSource.csv',',','"',null,0); + +CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'DRUID_SUPERVISORS','/tmp/csv/druid_supervisors.csv',',','"',null,0); +``` + +### MySQL + +```sql +LOAD DATA INFILE '/tmp/csv/druid_segments.csv' INTO TABLE druid_segments FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (id,dataSource,created_date,start,end,partitioned,version,used,payload); SHOW WARNINGS; + +LOAD DATA INFILE '/tmp/csv/druid_rules.csv' INTO TABLE druid_rules FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (id,dataSource,version,payload); SHOW WARNINGS; + +LOAD DATA INFILE '/tmp/csv/druid_config.csv' INTO TABLE druid_config FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (name,payload); SHOW WARNINGS; + +LOAD DATA INFILE '/tmp/csv/druid_dataSource.csv' INTO TABLE druid_dataSource FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (dataSource,created_date,commit_metadata_payload,commit_metadata_sha1); SHOW WARNINGS; + +LOAD DATA INFILE '/tmp/csv/druid_supervisors.csv' INTO TABLE druid_supervisors FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' (id,spec_id,created_date,payload); SHOW WARNINGS; +``` + +### PostgreSQL + +```sql +COPY druid_segments(id,dataSource,created_date,start,"end",partitioned,version,used,payload) FROM '/tmp/csv/druid_segments.csv' DELIMITER ',' CSV; + +COPY druid_rules(id,dataSource,version,payload) FROM '/tmp/csv/druid_rules.csv' DELIMITER ',' CSV; + +COPY druid_config(name,payload) FROM '/tmp/csv/druid_config.csv' DELIMITER ',' CSV; + +COPY druid_dataSource(dataSource,created_date,commit_metadata_payload,commit_metadata_sha1) FROM '/tmp/csv/druid_dataSource.csv' DELIMITER ',' CSV; + +COPY druid_supervisors(id,spec_id,created_date,payload) FROM '/tmp/csv/druid_supervisors.csv' DELIMITER ',' CSV; +``` \ No newline at end of file diff --git a/docs/content/operations/metadata-migration.md b/docs/content/operations/metadata-migration.md new file mode 100644 index 00000000000..95c05efe1ad --- /dev/null +++ b/docs/content/operations/metadata-migration.md @@ -0,0 +1,92 @@ +--- +layout: doc_page +title: "Metadata Migration" +--- + + + +# Metadata Migration + +If you have been running an evaluation Druid cluster using the built-in Derby metadata storage and wish to migrate to a +more production-capable metadata store such as MySQL or PostgreSQL, this document describes the necessary steps. + +## Shut down cluster services + +To ensure a clean migration, shut down the non-coordinator services to ensure that metadata state will not +change as you do the migration. + +When migrating from Derby, the coordinator processes will still need to be up initially, as they host the Derby database. + +## Exporting metadata + +Druid provides an [Export Metadata Tool](../operations/export-metadata.html) for exporting metadata from Derby into CSV files +which can then be imported into your new metadata store. + +The tool also provides options for rewriting the deep storage locations of segments; this is useful +for [deep storage migration](../operations/deep-storage-migration.html). + +Run the `export-metadata` tool on your existing cluster, and save the CSV files it generates. After a successful export, you can shut down the coordinator. + +## Initializing the new metadata store + +### Create database + +Before importing the existing cluster metadata, you will need to set up the new metadata store. + +The [MySQL extension](../development/extensions-core/mysql.html) and [PostgreSQL extension](../development/extensions-core/postgresql.html) docs have instructions for initial database setup. + +### Update configuration + +Update your Druid runtime properties with the new metadata configuration. + +### Create Druid tables + +Druid provides a `metadata-init` tool for creating Druid's metadata tables. After initializing the Druid database, you can run the commands shown below from the root of the Druid package to initialize the tables. + +In the example commands below: +- `lib` is the the Druid lib directory +- `extensions` is the Druid extensions directory +- `base` corresponds to the value of `druid.metadata.storage.tables.base` in the configuration, `druid` by default. +- The `--connectURI` parameter corresponds to the value of `druid.metadata.storage.connector.connectURI`. +- The `--user` parameter corresponds to the value of `druid.metadata.storage.connector.user`. +- The `--password` parameter corresponds to the value of `druid.metadata.storage.connector.password`. + +#### MySQL + +```bash +cd ${DRUID_ROOT} +java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[\"mysql-metadata-storage\"] -Ddruid.metadata.storage.type=mysql org.apache.druid.cli.Main tools metadata-init --connectURI="" --user --password --base druid +``` + +#### PostgreSQL + +```bash +cd ${DRUID_ROOT} +java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[\"postgresql-metadata-storage\"] -Ddruid.metadata.storage.type=postgresql org.apache.druid.cli.Main tools metadata-init --connectURI="" --user --password --base druid +``` + +### Import metadata + +After initializing the tables, please refer to the [import commands](../operations/export-metadata.html#importing-metadata) for your target database. + +### Restart cluster + +After importing the metadata successfully, you can now restart your cluster. + diff --git a/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java b/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java index 8899ddc4adc..0bf3df8f305 100644 --- a/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java @@ -33,6 +33,7 @@ import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.tweak.HandleCallback; @ManageLifecycle public class DerbyConnector extends SQLMetadataConnector @@ -113,6 +114,31 @@ public class DerbyConnector extends SQLMetadataConnector return "VALUES 1"; } + @Override + public void exportTable( + String tableName, + String outputPath + ) + { + retryWithHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) + { + handle.createStatement( + StringUtils.format( + "CALL SYSCS_UTIL.SYSCS_EXPORT_TABLE (null, '%s', '%s', null, null, null)", + tableName, + outputPath + ) + ).execute(); + return null; + } + } + ); + } + @LifecycleStart public void start() { diff --git a/services/src/main/java/org/apache/druid/cli/ExportMetadata.java b/services/src/main/java/org/apache/druid/cli/ExportMetadata.java new file mode 100644 index 00000000000..27f5509570e --- /dev/null +++ b/services/src/main/java/org/apache/druid/cli/ExportMetadata.java @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.cli; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.opencsv.CSVParser; +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.druid.guice.DruidProcessingModule; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.QueryRunnerFactoryModule; +import org.apache.druid.guice.QueryableModule; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.MetadataStorageConnectorConfig; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.server.DruidNode; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import javax.xml.bind.DatatypeConverter; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +@Command( + name = "export-metadata", + description = "Exports the contents of a Druid Derby metadata store to CSV files to assist with cluster migration. This tool also provides the ability to rewrite segment locations in the Derby metadata to assist with deep storage migration." +) +public class ExportMetadata extends GuiceRunnable +{ + @Option(name = "--connectURI", description = "Database JDBC connection string", required = true) + private String connectURI; + + @Option(name = "--user", description = "Database username") + private String user = null; + + @Option(name = "--password", description = "Database password") + private String password = null; + + @Option(name = "--base", description = "Base table name") + private String base = "druid"; + + @Option( + name = {"-b", "--s3bucket"}, + title = "s3bucket", + description = "S3 bucket of the migrated segments", + required = false) + public String s3Bucket = null; + + @Option( + name = {"-k", "--s3baseKey"}, + title = "s3baseKey", + description = "S3 baseKey of the migrated segments", + required = false) + public String s3baseKey = null; + + @Option( + name = {"-h", "--hadoopStorageDirectory"}, + title = "hadoopStorageDirectory", + description = "hadoopStorageDirectory of the migrated segments", + required = false) + public String hadoopStorageDirectory = null; + + @Option( + name = {"-n", "--newLocalPath"}, + title = "newLocalPath", + description = "newLocalPath of the migrated segments", + required = false) + public String newLocalPath = null; + + @Option( + name = {"-o", "--output-path"}, + title = "output-path", + description = "CSV output path", + required = false) + public String outputPath = null; + + @Option( + name = {"-x", "--use-hex-blobs"}, + title = "use-hex-blobs", + description = "Write BLOB payloads as hex strings", + required = false) + public boolean useHexBlobs = false; + + @Option( + name = {"-t", "--booleans-as-strings"}, + title = "booleans-as-strings", + description = "Write boolean values as true/false strings instead of 1/0", + required = false) + public boolean booleansAsStrings = false; + + private static final Logger log = new Logger(ExportMetadata.class); + + private static final CSVParser parser = new CSVParser(); + + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + public ExportMetadata() + { + super(log); + } + + @Override + protected List getModules() + { + return ImmutableList.of( + // This area is copied from CreateTables. + // It's unknown why those modules are required in CreateTables, and if all of those modules are required or not. + // Maybe some of those modules could be removed. + // See https://github.com/apache/incubator-druid/pull/4429#discussion_r123602930 + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), + binder -> { + JsonConfigProvider.bindInstance( + binder, + Key.get(MetadataStorageConnectorConfig.class), + new MetadataStorageConnectorConfig() + { + @Override + public String getConnectURI() + { + return connectURI; + } + + @Override + public String getUser() + { + return user; + } + + @Override + public String getPassword() + { + return password; + } + } + ); + JsonConfigProvider.bindInstance( + binder, + Key.get(MetadataStorageTablesConfig.class), + MetadataStorageTablesConfig.fromBase(base) + ); + JsonConfigProvider.bindInstance( + binder, + Key.get(DruidNode.class, Self.class), + new DruidNode("tools", "localhost", false, -1, null, true, false) + ); + } + ); + } + + @Override + public void run() + { + InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue(ObjectMapper.class, jsonMapper); + injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + jsonMapper.setInjectableValues(injectableValues); + + if (hadoopStorageDirectory != null && newLocalPath != null) { + throw new IllegalArgumentException( + "Only one of s3Bucket, hadoopStorageDirectory, and newLocalPath can be set." + ); + } + + if (s3Bucket != null && (hadoopStorageDirectory != null || newLocalPath != null)) { + throw new IllegalArgumentException( + "Only one of s3Bucket, hadoopStorageDirectory, and newLocalPath can be set." + ); + } + + if (s3Bucket != null && s3baseKey == null) { + throw new IllegalArgumentException("s3baseKey must be set if s3Bucket is set."); + } + + final Injector injector = makeInjector(); + SQLMetadataConnector dbConnector = injector.getInstance(SQLMetadataConnector.class); + MetadataStorageTablesConfig metadataStorageTablesConfig = injector.getInstance(MetadataStorageTablesConfig.class); + + // We export a raw CSV first, and then apply some conversions for easier imports: + // Boolean strings are rewritten as 1 and 0 + // hexadecimal BLOB columns are rewritten with rewriteHexPayloadAsEscapedJson() + log.info("Exporting datasource table: " + metadataStorageTablesConfig.getDataSourceTable()); + exportTable(dbConnector, metadataStorageTablesConfig.getDataSourceTable(), true); + rewriteDatasourceExport(metadataStorageTablesConfig.getDataSourceTable()); + + log.info("Exporting segments table: " + metadataStorageTablesConfig.getSegmentsTable()); + exportTable(dbConnector, metadataStorageTablesConfig.getSegmentsTable(), true); + rewriteSegmentsExport(metadataStorageTablesConfig.getSegmentsTable()); + + log.info("Exporting rules table: " + metadataStorageTablesConfig.getRulesTable()); + exportTable(dbConnector, metadataStorageTablesConfig.getRulesTable(), true); + rewriteRulesExport(metadataStorageTablesConfig.getRulesTable()); + + log.info("Exporting config table: " + metadataStorageTablesConfig.getConfigTable()); + exportTable(dbConnector, metadataStorageTablesConfig.getConfigTable(), true); + rewriteConfigExport(metadataStorageTablesConfig.getConfigTable()); + + log.info("Exporting supervisor table: " + metadataStorageTablesConfig.getSupervisorTable()); + exportTable(dbConnector, metadataStorageTablesConfig.getSupervisorTable(), true); + rewriteSupervisorExport(metadataStorageTablesConfig.getSupervisorTable()); + } + + private void exportTable( + SQLMetadataConnector dbConnector, + String tableName, + boolean withRawFilename + ) + { + String pathFormatString; + if (withRawFilename) { + pathFormatString = "%s/%s_raw.csv"; + } else { + pathFormatString = "%s/%s.csv"; + } + dbConnector.exportTable( + StringUtils.toUpperCase(tableName), + StringUtils.format(pathFormatString, outputPath, tableName) + ); + } + + private void rewriteDatasourceExport( + String datasourceTableName + ) + { + String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, datasourceTableName); + String outFile = StringUtils.format("%s/%s.csv", outputPath, datasourceTableName); + try ( + BufferedReader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8) + ); + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8); + ) { + String line; + while ((line = reader.readLine()) != null) { + String[] parsed = parser.parseLine(line); + + StringBuilder newLineBuilder = new StringBuilder(); + newLineBuilder.append(parsed[0]).append(","); //dataSource + newLineBuilder.append(parsed[1]).append(","); //created_date + newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[2])).append(","); //commit_metadata_payload + newLineBuilder.append(parsed[3]); //commit_metadata_sha1 + newLineBuilder.append("\n"); + writer.write(newLineBuilder.toString()); + + } + } + catch (IOException ioex) { + throw new RuntimeException(ioex); + } + } + + private void rewriteRulesExport( + String rulesTableName + ) + { + String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, rulesTableName); + String outFile = StringUtils.format("%s/%s.csv", outputPath, rulesTableName); + try ( + BufferedReader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8) + ); + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8); + ) { + String line; + while ((line = reader.readLine()) != null) { + String[] parsed = parser.parseLine(line); + + StringBuilder newLineBuilder = new StringBuilder(); + newLineBuilder.append(parsed[0]).append(","); //id + newLineBuilder.append(parsed[1]).append(","); //dataSource + newLineBuilder.append(parsed[2]).append(","); //version + newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[3])); //payload + newLineBuilder.append("\n"); + writer.write(newLineBuilder.toString()); + + } + } + catch (IOException ioex) { + throw new RuntimeException(ioex); + } + } + + private void rewriteConfigExport( + String configTableName + ) + { + String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, configTableName); + String outFile = StringUtils.format("%s/%s.csv", outputPath, configTableName); + try ( + BufferedReader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8) + ); + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8); + ) { + String line; + while ((line = reader.readLine()) != null) { + String[] parsed = parser.parseLine(line); + + StringBuilder newLineBuilder = new StringBuilder(); + newLineBuilder.append(parsed[0]).append(","); //name + newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[1])); //payload + newLineBuilder.append("\n"); + writer.write(newLineBuilder.toString()); + + } + } + catch (IOException ioex) { + throw new RuntimeException(ioex); + } + } + + private void rewriteSupervisorExport( + String supervisorTableName + ) + { + String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, supervisorTableName); + String outFile = StringUtils.format("%s/%s.csv", outputPath, supervisorTableName); + try ( + BufferedReader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8) + ); + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8); + ) { + String line; + while ((line = reader.readLine()) != null) { + String[] parsed = parser.parseLine(line); + + StringBuilder newLineBuilder = new StringBuilder(); + newLineBuilder.append(parsed[0]).append(","); //id + newLineBuilder.append(parsed[1]).append(","); //spec_id + newLineBuilder.append(parsed[2]).append(","); //created_date + newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[3])); //payload + newLineBuilder.append("\n"); + writer.write(newLineBuilder.toString()); + + } + } + catch (IOException ioex) { + throw new RuntimeException(ioex); + } + } + + + private void rewriteSegmentsExport( + String segmentsTableName + ) + { + String inFile = StringUtils.format(("%s/%s_raw.csv"), outputPath, segmentsTableName); + String outFile = StringUtils.format("%s/%s.csv", outputPath, segmentsTableName); + try ( + BufferedReader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(inFile), Charsets.UTF_8) + ); + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(outFile), Charsets.UTF_8); + ) { + String line; + while ((line = reader.readLine()) != null) { + String[] parsed = parser.parseLine(line); + StringBuilder newLineBuilder = new StringBuilder(); + newLineBuilder.append(parsed[0]).append(","); //id + newLineBuilder.append(parsed[1]).append(","); //dataSource + newLineBuilder.append(parsed[2]).append(","); //created_date + newLineBuilder.append(parsed[3]).append(","); //start + newLineBuilder.append(parsed[4]).append(","); //end + newLineBuilder.append(convertBooleanString(parsed[5])).append(","); //partitioned + newLineBuilder.append(parsed[6]).append(","); //version + newLineBuilder.append(convertBooleanString(parsed[7])).append(","); //used + + if (s3Bucket != null || hadoopStorageDirectory != null || newLocalPath != null) { + newLineBuilder.append(makePayloadWithConvertedLoadSpec(parsed[8])); + } else { + newLineBuilder.append(rewriteHexPayloadAsEscapedJson(parsed[8])); //payload + } + newLineBuilder.append("\n"); + writer.write(newLineBuilder.toString()); + + } + } + catch (IOException ioex) { + throw new RuntimeException(ioex); + } + } + + /** + * Returns a new load spec in escaped JSON form, with the new deep storage location if configured. + */ + private String makePayloadWithConvertedLoadSpec( + String payload + ) throws IOException + { + DataSegment segment = jsonMapper.readValue(DatatypeConverter.parseHexBinary(payload), DataSegment.class); + String uniqueId = getUniqueIDFromLocalLoadSpec(segment.getLoadSpec()); + String segmentPath = DataSegmentPusher.getDefaultStorageDirWithExistingUniquePath(segment, uniqueId); + + Map newLoadSpec = null; + if (s3Bucket != null) { + newLoadSpec = makeS3LoadSpec(segmentPath); + } else if (hadoopStorageDirectory != null) { + newLoadSpec = makeHDFSLoadSpec(segmentPath); + } else if (newLocalPath != null) { + newLoadSpec = makeLocalLoadSpec(segmentPath); + } + + if (newLoadSpec != null) { + segment = new DataSegment( + segment.getDataSource(), + segment.getInterval(), + segment.getVersion(), + newLoadSpec, + segment.getDimensions(), + segment.getMetrics(), + segment.getShardSpec(), + segment.getBinaryVersion(), + segment.getSize() + ); + } + + String serialized = jsonMapper.writeValueAsString(segment); + if (useHexBlobs) { + return DatatypeConverter.printHexBinary(StringUtils.toUtf8(serialized)); + } else { + return escapeJSONForCSV(serialized); + } + } + + /** + * Derby's export tool writes BLOB columns as a hexadecimal string: + * https://db.apache.org/derby/docs/10.9/adminguide/cadminimportlobs.html + * + * Decodes the hex string and escapes the decoded JSON. + */ + private String rewriteHexPayloadAsEscapedJson( + String payload + ) + { + if (useHexBlobs) { + return payload; + } + String json = StringUtils.fromUtf8(DatatypeConverter.parseHexBinary(payload)); + return escapeJSONForCSV(json); + } + + private String convertBooleanString(String booleanString) + { + if (booleansAsStrings) { + return booleanString; + } else { + return "true".equals(booleanString) ? "1" : "0"; + } + } + + private String escapeJSONForCSV(String json) + { + return "\"" + StringUtils.replace(json, "\"", "\"\"") + "\""; + } + + private Map makeS3LoadSpec( + String segmentPath + ) + { + return ImmutableMap.of( + "type", "s3_zip", + "bucket", s3Bucket, + "key", StringUtils.format("%s/%s/index.zip", s3baseKey, segmentPath) + ); + } + + /** + * Makes an HDFS spec, replacing colons with underscores. HDFS doesn't support colons in filenames. + */ + private Map makeHDFSLoadSpec( + String segmentPath + ) + { + return ImmutableMap.of( + "type", "hdfs", + "path", StringUtils.format("%s/%s/index.zip", hadoopStorageDirectory, segmentPath.replace(':', '_')) + ); + } + + private Map makeLocalLoadSpec( + String segmentPath + ) + { + return ImmutableMap.of( + "type", "local", + "path", StringUtils.format("%s/%s/index.zip", newLocalPath, segmentPath) + ); + } + + /** + * Looks for an optional unique path component in the segment path. + * The unique path is used for segments created by realtime indexing tasks like Kafka. + */ + @Nullable + private String getUniqueIDFromLocalLoadSpec( + Map localLoadSpec + ) + { + String[] splits = ((String) localLoadSpec.get("path")).split("/"); + if (splits.length < 2) { + return null; + } + String maybeUUID = splits[splits.length - 2]; + + try { + UUID.fromString(maybeUUID); + return maybeUUID; + } + catch (IllegalArgumentException iae) { + return null; + } + } +} diff --git a/services/src/main/java/org/apache/druid/cli/Main.java b/services/src/main/java/org/apache/druid/cli/Main.java index f54d0b40574..34f00a0fa7f 100644 --- a/services/src/main/java/org/apache/druid/cli/Main.java +++ b/services/src/main/java/org/apache/druid/cli/Main.java @@ -80,7 +80,8 @@ public class Main CreateTables.class, DumpSegment.class, ResetCluster.class, - ValidateSegments.class + ValidateSegments.class, + ExportMetadata.class ); builder.withGroup("tools") .withDescription("Various tools for working with Druid")