diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 2235916a105..03675a28aa5 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -29,8 +29,11 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; import com.google.inject.Inject; import com.metamx.common.IAE; +import com.metamx.common.StringUtils; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.logger.Logger; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -447,10 +450,23 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor // Avoiding ON DUPLICATE KEY since it's not portable. // Avoiding try/catch since it may cause inadvertent transaction-splitting. + // UNIQUE key for the row, ensuring sequences do not fork in two directions. + // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines + // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) + final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( + Hashing.sha1() + .newHasher() + .putBytes(StringUtils.toUtf8(sequenceName)) + .putByte((byte) 0xff) + .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) + .hash() + .asBytes() + ); + handle.createStatement( String.format( - "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :payload)", + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", dbTables.getPendingSegmentsTable() ) ) @@ -461,6 +477,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .bind("end", interval.getEnd().toString()) .bind("sequence_name", sequenceName) .bind("sequence_prev_id", previousSegmentIdNotNull) + .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) .execute(); diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java index 3846298ba3f..a004e69927c 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java @@ -197,9 +197,10 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector + " \"end\" VARCHAR(255) NOT NULL,\n" + " sequence_name VARCHAR(255) NOT NULL,\n" + " sequence_prev_id VARCHAR(255) NOT NULL,\n" + + " sequence_name_prev_id_sha1 VARCHAR(255) NOT NULL,\n" + " payload %2$s NOT NULL,\n" + " PRIMARY KEY (id),\n" - + " UNIQUE (sequence_name, sequence_prev_id)\n" + + " UNIQUE (sequence_name_prev_id_sha1)\n" + ")", tableName, getPayloadType() )