mirror of https://github.com/apache/druid.git
Merge pull request #2321 from gianm/pending-index
Replace two-column index on pendingSegments table with one-column index.
This commit is contained in:
commit
7a6109f0ca
|
@ -29,8 +29,11 @@ import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Ordering;
|
import com.google.common.collect.Ordering;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
import com.google.common.hash.Hashing;
|
||||||
|
import com.google.common.io.BaseEncoding;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.metamx.common.IAE;
|
import com.metamx.common.IAE;
|
||||||
|
import com.metamx.common.StringUtils;
|
||||||
import com.metamx.common.lifecycle.LifecycleStart;
|
import com.metamx.common.lifecycle.LifecycleStart;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||||
|
@ -447,10 +450,23 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||||
// Avoiding ON DUPLICATE KEY since it's not portable.
|
// Avoiding ON DUPLICATE KEY since it's not portable.
|
||||||
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
|
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
|
||||||
|
|
||||||
|
// UNIQUE key for the row, ensuring sequences do not fork in two directions.
|
||||||
|
// Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines
|
||||||
|
// have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319)
|
||||||
|
final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
|
||||||
|
Hashing.sha1()
|
||||||
|
.newHasher()
|
||||||
|
.putBytes(StringUtils.toUtf8(sequenceName))
|
||||||
|
.putByte((byte) 0xff)
|
||||||
|
.putBytes(StringUtils.toUtf8(previousSegmentIdNotNull))
|
||||||
|
.hash()
|
||||||
|
.asBytes()
|
||||||
|
);
|
||||||
|
|
||||||
handle.createStatement(
|
handle.createStatement(
|
||||||
String.format(
|
String.format(
|
||||||
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, payload) "
|
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) "
|
||||||
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :payload)",
|
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)",
|
||||||
dbTables.getPendingSegmentsTable()
|
dbTables.getPendingSegmentsTable()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -461,6 +477,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||||
.bind("end", interval.getEnd().toString())
|
.bind("end", interval.getEnd().toString())
|
||||||
.bind("sequence_name", sequenceName)
|
.bind("sequence_name", sequenceName)
|
||||||
.bind("sequence_prev_id", previousSegmentIdNotNull)
|
.bind("sequence_prev_id", previousSegmentIdNotNull)
|
||||||
|
.bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)
|
||||||
.bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
|
.bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
|
||||||
.execute();
|
.execute();
|
||||||
|
|
||||||
|
|
|
@ -197,9 +197,10 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
||||||
+ " \"end\" VARCHAR(255) NOT NULL,\n"
|
+ " \"end\" VARCHAR(255) NOT NULL,\n"
|
||||||
+ " sequence_name VARCHAR(255) NOT NULL,\n"
|
+ " sequence_name VARCHAR(255) NOT NULL,\n"
|
||||||
+ " sequence_prev_id VARCHAR(255) NOT NULL,\n"
|
+ " sequence_prev_id VARCHAR(255) NOT NULL,\n"
|
||||||
|
+ " sequence_name_prev_id_sha1 VARCHAR(255) NOT NULL,\n"
|
||||||
+ " payload %2$s NOT NULL,\n"
|
+ " payload %2$s NOT NULL,\n"
|
||||||
+ " PRIMARY KEY (id),\n"
|
+ " PRIMARY KEY (id),\n"
|
||||||
+ " UNIQUE (sequence_name, sequence_prev_id)\n"
|
+ " UNIQUE (sequence_name_prev_id_sha1)\n"
|
||||||
+ ")",
|
+ ")",
|
||||||
tableName, getPayloadType()
|
tableName, getPayloadType()
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue