mirror of https://github.com/apache/druid.git
modified "end" column to `end` (#3903)
* modified "end" column to `end`. "end" is interpretted as a string rather than dereferencing the column value * SQLMetadataConnector.getQuoteString defines the string that should be used to quote string fields * positional arguments for String.format * for Connectors that use " need to include the \ escape as well
This commit is contained in:
parent
991e2852da
commit
c1eee9bbf3
|
@ -110,6 +110,7 @@ public class SQLServerConnector extends SQLMetadataConnector
|
|||
*/
|
||||
private static final String SERIAL_TYPE = "[bigint] IDENTITY (1, 1)";
|
||||
|
||||
private static final String QUOTE_STRING = "\\\"";
|
||||
public static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
|
||||
|
||||
private final DBI dbi;
|
||||
|
@ -193,6 +194,11 @@ public class SQLServerConnector extends SQLMetadataConnector
|
|||
return SERIAL_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQuoteString() {
|
||||
return QUOTE_STRING;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStreamingFetchSize()
|
||||
{
|
||||
|
|
|
@ -104,7 +104,7 @@ public class CustomStatementRewriterTest
|
|||
+ " dataSource VARCHAR(255) NOT NULL,\n"
|
||||
+ " created_date VARCHAR(255) NOT NULL,\n"
|
||||
+ " start VARCHAR(255) NOT NULL,\n"
|
||||
+ " \"end\" VARCHAR(255) NOT NULL,\n"
|
||||
+ " `end` VARCHAR(255) NOT NULL,\n"
|
||||
+ " partitioned BOOLEAN NOT NULL,\n"
|
||||
+ " version VARCHAR(255) NOT NULL,\n"
|
||||
+ " used BOOLEAN NOT NULL,\n"
|
||||
|
@ -117,7 +117,7 @@ public class CustomStatementRewriterTest
|
|||
" dataSource VARCHAR(255) NOT NULL,\n" +
|
||||
" created_date VARCHAR(255) NOT NULL,\n" +
|
||||
" start VARCHAR(255) NOT NULL,\n" +
|
||||
" \"end\" VARCHAR(255) NOT NULL,\n" +
|
||||
" `end` VARCHAR(255) NOT NULL,\n" +
|
||||
" partitioned BIT NOT NULL,\n" +
|
||||
" version VARCHAR(255) NOT NULL,\n" +
|
||||
" used BIT NOT NULL,\n" +
|
||||
|
|
|
@ -42,6 +42,7 @@ public class MySQLConnector extends SQLMetadataConnector
|
|||
private static final Logger log = new Logger(MySQLConnector.class);
|
||||
private static final String PAYLOAD_TYPE = "LONGBLOB";
|
||||
private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT";
|
||||
private static final String QUOTE_STRING = "`";
|
||||
|
||||
private final DBI dbi;
|
||||
|
||||
|
@ -76,6 +77,11 @@ public class MySQLConnector extends SQLMetadataConnector
|
|||
return SERIAL_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQuoteString() {
|
||||
return QUOTE_STRING;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStreamingFetchSize()
|
||||
{
|
||||
|
|
|
@ -40,6 +40,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
|
|||
private static final Logger log = new Logger(PostgreSQLConnector.class);
|
||||
private static final String PAYLOAD_TYPE = "BYTEA";
|
||||
private static final String SERIAL_TYPE = "BIGSERIAL";
|
||||
private static final String QUOTE_STRING = "\\\"";
|
||||
public static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
|
||||
|
||||
private final DBI dbi;
|
||||
|
@ -73,6 +74,11 @@ public class PostgreSQLConnector extends SQLMetadataConnector
|
|||
return SERIAL_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQuoteString() {
|
||||
return QUOTE_STRING;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStreamingFetchSize()
|
||||
{
|
||||
|
|
|
@ -38,11 +38,13 @@ import java.util.List;
|
|||
public class SQLMetadataStorageUpdaterJobHandler implements MetadataStorageUpdaterJobHandler
|
||||
{
|
||||
private static final Logger log = new Logger(SQLMetadataStorageUpdaterJobHandler.class);
|
||||
private final SQLMetadataConnector connector;
|
||||
private final IDBI dbi;
|
||||
|
||||
@Inject
|
||||
public SQLMetadataStorageUpdaterJobHandler(SQLMetadataConnector connector)
|
||||
{
|
||||
this.connector = connector;
|
||||
this.dbi = connector.getDBI();
|
||||
}
|
||||
|
||||
|
@ -56,9 +58,9 @@ public class SQLMetadataStorageUpdaterJobHandler implements MetadataStorageUpdat
|
|||
{
|
||||
final PreparedBatch batch = handle.prepareBatch(
|
||||
String.format(
|
||||
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
|
||||
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) "
|
||||
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
|
||||
tableName
|
||||
tableName, connector.getQuoteString()
|
||||
)
|
||||
);
|
||||
for (final DataSegment segment : segments) {
|
||||
|
|
|
@ -181,8 +181,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
final ResultIterator<byte[]> dbSegments =
|
||||
handle.createQuery(
|
||||
String.format(
|
||||
"SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start",
|
||||
dbTables.getPendingSegmentsTable()
|
||||
"SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start <= :end and %2$send%2$s >= :start",
|
||||
dbTables.getPendingSegmentsTable(), connector.getQuoteString()
|
||||
)
|
||||
)
|
||||
.bind("dataSource", dataSource)
|
||||
|
@ -219,7 +219,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ? AND (");
|
||||
for (int i = 0; i < intervals.size(); i++) {
|
||||
sb.append(
|
||||
"(start <= ? AND \"end\" >= ?)"
|
||||
String.format("(start <= ? AND %1$send%1$s >= ?)", connector.getQuoteString())
|
||||
);
|
||||
if (i == intervals.size() - 1) {
|
||||
sb.append(")");
|
||||
|
@ -560,9 +560,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) "
|
||||
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) "
|
||||
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)",
|
||||
dbTables.getPendingSegmentsTable()
|
||||
dbTables.getPendingSegmentsTable(), connector.getQuoteString()
|
||||
)
|
||||
)
|
||||
.bind("id", newIdentifier.getIdentifierAsString())
|
||||
|
@ -614,9 +614,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
|
||||
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) "
|
||||
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
|
||||
dbTables.getSegmentsTable()
|
||||
dbTables.getSegmentsTable(), connector.getQuoteString()
|
||||
)
|
||||
)
|
||||
.bind("id", segment.getIdentifier())
|
||||
|
@ -926,8 +926,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
return handle
|
||||
.createQuery(
|
||||
String.format(
|
||||
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false",
|
||||
dbTables.getSegmentsTable()
|
||||
"SELECT payload FROM %1$s WHERE dataSource = :dataSource and start >= :start and %2$send%2$s <= :end and used = false",
|
||||
dbTables.getSegmentsTable(), connector.getQuoteString()
|
||||
)
|
||||
)
|
||||
.setFetchSize(connector.getStreamingFetchSize())
|
||||
|
|
|
@ -107,6 +107,11 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
*/
|
||||
protected abstract int getStreamingFetchSize();
|
||||
|
||||
/**
|
||||
* @return the string that should be used to quote string fields
|
||||
*/
|
||||
public abstract String getQuoteString();
|
||||
|
||||
public String getValidationQuery() { return "SELECT 1"; }
|
||||
|
||||
public abstract boolean tableExists(Handle handle, final String tableName);
|
||||
|
@ -212,7 +217,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
+ " dataSource VARCHAR(255) NOT NULL,\n"
|
||||
+ " created_date VARCHAR(255) NOT NULL,\n"
|
||||
+ " start VARCHAR(255) NOT NULL,\n"
|
||||
+ " \"end\" VARCHAR(255) NOT NULL,\n"
|
||||
+ " %3$send%3$s VARCHAR(255) NOT NULL,\n"
|
||||
+ " sequence_name VARCHAR(255) NOT NULL,\n"
|
||||
+ " sequence_prev_id VARCHAR(255) NOT NULL,\n"
|
||||
+ " sequence_name_prev_id_sha1 VARCHAR(255) NOT NULL,\n"
|
||||
|
@ -220,7 +225,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
+ " PRIMARY KEY (id),\n"
|
||||
+ " UNIQUE (sequence_name_prev_id_sha1)\n"
|
||||
+ ")",
|
||||
tableName, getPayloadType()
|
||||
tableName, getPayloadType(), getQuoteString()
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -256,14 +261,14 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
+ " dataSource VARCHAR(255) NOT NULL,\n"
|
||||
+ " created_date VARCHAR(255) NOT NULL,\n"
|
||||
+ " start VARCHAR(255) NOT NULL,\n"
|
||||
+ " \"end\" VARCHAR(255) NOT NULL,\n"
|
||||
+ " %3$send%3$s VARCHAR(255) NOT NULL,\n"
|
||||
+ " partitioned BOOLEAN NOT NULL,\n"
|
||||
+ " version VARCHAR(255) NOT NULL,\n"
|
||||
+ " used BOOLEAN NOT NULL,\n"
|
||||
+ " payload %2$s NOT NULL,\n"
|
||||
+ " PRIMARY KEY (id)\n"
|
||||
+ ")",
|
||||
tableName, getPayloadType()
|
||||
tableName, getPayloadType(), getQuoteString()
|
||||
),
|
||||
String.format("CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource)", tableName),
|
||||
String.format("CREATE INDEX idx_%1$s_used ON %1$s(used)", tableName)
|
||||
|
|
|
@ -554,8 +554,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
|||
Iterator<Interval> iter = handle
|
||||
.createQuery(
|
||||
String.format(
|
||||
"SELECT start, \"end\" FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false ORDER BY start, \"end\"",
|
||||
getSegmentsTable()
|
||||
"SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource and start >= :start and %2$send%2$s <= :end and used = false ORDER BY start, %2$send%2$s",
|
||||
getSegmentsTable(), connector.getQuoteString()
|
||||
)
|
||||
)
|
||||
.setFetchSize(connector.getStreamingFetchSize())
|
||||
|
|
|
@ -55,9 +55,9 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher
|
|||
this.config = config;
|
||||
this.connector = connector;
|
||||
this.statement = String.format(
|
||||
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
|
||||
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) "
|
||||
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
|
||||
config.getSegmentsTable()
|
||||
config.getSegmentsTable(), connector.getQuoteString()
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ public class DerbyConnector extends SQLMetadataConnector
|
|||
{
|
||||
private static final Logger log = new Logger(DerbyConnector.class);
|
||||
private static final String SERIAL_TYPE = "BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)";
|
||||
private static final String QUOTE_STRING = "\\\"";
|
||||
private final DBI dbi;
|
||||
private final MetadataStorage storage;
|
||||
|
||||
|
@ -86,6 +87,11 @@ public class DerbyConnector extends SQLMetadataConnector
|
|||
return SERIAL_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQuoteString() {
|
||||
return QUOTE_STRING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DBI getDBI() { return dbi; }
|
||||
|
||||
|
|
Loading…
Reference in New Issue