MAPREDUCE-6237. Multiple mappers with DBInputFormat don't work because of reusing conections. Contributed by Kannan Rajah.
(cherry picked from commit 241336ca2b
)
Conflicts:
hadoop-mapreduce-project/CHANGES.txt
This commit is contained in:
parent
ef212fbe7e
commit
5b3d9bf636
|
@ -1,5 +1,20 @@
|
||||||
Hadoop MapReduce Change Log
|
Hadoop MapReduce Change Log
|
||||||
|
|
||||||
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
|
INCOMPATIBLE CHANGES
|
||||||
|
|
||||||
|
NEW FEATURES
|
||||||
|
|
||||||
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
BUG FIXES
|
||||||
|
|
||||||
|
MAPREDUCE-6237. Multiple mappers with DBInputFormat don't work because of
|
||||||
|
reusing conections. (Kannan Rajah via ozawa)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -159,7 +159,7 @@ public class DBInputFormat<T extends DBWritable>
|
||||||
dbConf = new DBConfiguration(conf);
|
dbConf = new DBConfiguration(conf);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
getConnection();
|
this.connection = createConnection();
|
||||||
|
|
||||||
DatabaseMetaData dbMeta = connection.getMetaData();
|
DatabaseMetaData dbMeta = connection.getMetaData();
|
||||||
this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
|
this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
|
||||||
|
@ -182,18 +182,25 @@ public class DBInputFormat<T extends DBWritable>
|
||||||
}
|
}
|
||||||
|
|
||||||
public Connection getConnection() {
|
public Connection getConnection() {
|
||||||
|
// TODO Remove this code that handles backward compatibility.
|
||||||
|
if (this.connection == null) {
|
||||||
|
this.connection = createConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Connection createConnection() {
|
||||||
try {
|
try {
|
||||||
if (null == this.connection) {
|
Connection newConnection = dbConf.getConnection();
|
||||||
// The connection was closed; reinstantiate it.
|
newConnection.setAutoCommit(false);
|
||||||
this.connection = dbConf.getConnection();
|
newConnection.setTransactionIsolation(
|
||||||
this.connection.setAutoCommit(false);
|
Connection.TRANSACTION_SERIALIZABLE);
|
||||||
this.connection.setTransactionIsolation(
|
|
||||||
Connection.TRANSACTION_SERIALIZABLE);
|
return newConnection;
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
return connection;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getDBProductName() {
|
public String getDBProductName() {
|
||||||
|
@ -210,17 +217,17 @@ public class DBInputFormat<T extends DBWritable>
|
||||||
if (dbProductName.startsWith("ORACLE")) {
|
if (dbProductName.startsWith("ORACLE")) {
|
||||||
// use Oracle-specific db reader.
|
// use Oracle-specific db reader.
|
||||||
return new OracleDBRecordReader<T>(split, inputClass,
|
return new OracleDBRecordReader<T>(split, inputClass,
|
||||||
conf, getConnection(), getDBConf(), conditions, fieldNames,
|
conf, createConnection(), getDBConf(), conditions, fieldNames,
|
||||||
tableName);
|
tableName);
|
||||||
} else if (dbProductName.startsWith("MYSQL")) {
|
} else if (dbProductName.startsWith("MYSQL")) {
|
||||||
// use MySQL-specific db reader.
|
// use MySQL-specific db reader.
|
||||||
return new MySQLDBRecordReader<T>(split, inputClass,
|
return new MySQLDBRecordReader<T>(split, inputClass,
|
||||||
conf, getConnection(), getDBConf(), conditions, fieldNames,
|
conf, createConnection(), getDBConf(), conditions, fieldNames,
|
||||||
tableName);
|
tableName);
|
||||||
} else {
|
} else {
|
||||||
// Generic reader.
|
// Generic reader.
|
||||||
return new DBRecordReader<T>(split, inputClass,
|
return new DBRecordReader<T>(split, inputClass,
|
||||||
conf, getConnection(), getDBConf(), conditions, fieldNames,
|
conf, createConnection(), getDBConf(), conditions, fieldNames,
|
||||||
tableName);
|
tableName);
|
||||||
}
|
}
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
|
|
|
@ -178,7 +178,6 @@ public class DataDrivenDBInputFormat<T extends DBWritable>
|
||||||
|
|
||||||
ResultSet results = null;
|
ResultSet results = null;
|
||||||
Statement statement = null;
|
Statement statement = null;
|
||||||
Connection connection = getConnection();
|
|
||||||
try {
|
try {
|
||||||
statement = connection.createStatement();
|
statement = connection.createStatement();
|
||||||
|
|
||||||
|
@ -289,12 +288,12 @@ public class DataDrivenDBInputFormat<T extends DBWritable>
|
||||||
if (dbProductName.startsWith("MYSQL")) {
|
if (dbProductName.startsWith("MYSQL")) {
|
||||||
// use MySQL-specific db reader.
|
// use MySQL-specific db reader.
|
||||||
return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
|
return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
|
||||||
conf, getConnection(), dbConf, dbConf.getInputConditions(),
|
conf, createConnection(), dbConf, dbConf.getInputConditions(),
|
||||||
dbConf.getInputFieldNames(), dbConf.getInputTableName());
|
dbConf.getInputFieldNames(), dbConf.getInputTableName());
|
||||||
} else {
|
} else {
|
||||||
// Generic reader.
|
// Generic reader.
|
||||||
return new DataDrivenDBRecordReader<T>(split, inputClass,
|
return new DataDrivenDBRecordReader<T>(split, inputClass,
|
||||||
conf, getConnection(), dbConf, dbConf.getInputConditions(),
|
conf, createConnection(), dbConf, dbConf.getInputConditions(),
|
||||||
dbConf.getInputFieldNames(), dbConf.getInputTableName(),
|
dbConf.getInputFieldNames(), dbConf.getInputTableName(),
|
||||||
dbProductName);
|
dbProductName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class OracleDataDrivenDBInputFormat<T extends DBWritable>
|
||||||
try {
|
try {
|
||||||
// Use Oracle-specific db reader
|
// Use Oracle-specific db reader
|
||||||
return new OracleDataDrivenDBRecordReader<T>(split, inputClass,
|
return new OracleDataDrivenDBRecordReader<T>(split, inputClass,
|
||||||
conf, getConnection(), dbConf, dbConf.getInputConditions(),
|
conf, createConnection(), dbConf, dbConf.getInputConditions(),
|
||||||
dbConf.getInputFieldNames(), dbConf.getInputTableName());
|
dbConf.getInputFieldNames(), dbConf.getInputTableName());
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
throw new IOException(ex.getMessage());
|
throw new IOException(ex.getMessage());
|
||||||
|
|
Loading…
Reference in New Issue