HDFS-10560. DiskBalancer: Reuse ObjectMapper instance to improve the performance. Contributed by Yiqun Lin.
This commit is contained in:
parent
b427ce12bc
commit
b047bc7270
|
@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.htrace.fasterxml.jackson.annotation.JsonInclude;
|
import org.apache.htrace.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.map.ObjectReader;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -34,6 +35,10 @@ import java.io.IOException;
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||||
public class DiskBalancerWorkItem {
|
public class DiskBalancerWorkItem {
|
||||||
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||||
|
private static final ObjectReader READER =
|
||||||
|
new ObjectMapper().reader(DiskBalancerWorkItem.class);
|
||||||
|
|
||||||
private long startTime;
|
private long startTime;
|
||||||
private long secondsElapsed;
|
private long secondsElapsed;
|
||||||
private long bytesToCopy;
|
private long bytesToCopy;
|
||||||
|
@ -74,8 +79,7 @@ public class DiskBalancerWorkItem {
|
||||||
*/
|
*/
|
||||||
public static DiskBalancerWorkItem parseJson(String json) throws IOException {
|
public static DiskBalancerWorkItem parseJson(String json) throws IOException {
|
||||||
Preconditions.checkNotNull(json);
|
Preconditions.checkNotNull(json);
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return READER.readValue(json);
|
||||||
return mapper.readValue(json, DiskBalancerWorkItem.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -169,8 +173,7 @@ public class DiskBalancerWorkItem {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public String toJson() throws IOException {
|
public String toJson() throws IOException {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return MAPPER.writeValueAsString(this);
|
||||||
return mapper.writeValueAsString(this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.map.ObjectReader;
|
||||||
import org.codehaus.jackson.map.SerializationConfig;
|
import org.codehaus.jackson.map.SerializationConfig;
|
||||||
|
|
||||||
import static org.codehaus.jackson.map.type.TypeFactory.defaultInstance;
|
import static org.codehaus.jackson.map.type.TypeFactory.defaultInstance;
|
||||||
|
@ -38,6 +39,15 @@ import java.util.LinkedList;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class DiskBalancerWorkStatus {
|
public class DiskBalancerWorkStatus {
|
||||||
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||||
|
private static final ObjectMapper MAPPER_WITH_INDENT_OUTPUT =
|
||||||
|
new ObjectMapper().enable(
|
||||||
|
SerializationConfig.Feature.INDENT_OUTPUT);
|
||||||
|
private static final ObjectReader READER_WORKSTATUS =
|
||||||
|
new ObjectMapper().reader(DiskBalancerWorkStatus.class);
|
||||||
|
private static final ObjectReader READER_WORKENTRY = new ObjectMapper()
|
||||||
|
.reader(defaultInstance().constructCollectionType(List.class,
|
||||||
|
DiskBalancerWorkEntry.class));
|
||||||
|
|
||||||
private final List<DiskBalancerWorkEntry> currentState;
|
private final List<DiskBalancerWorkEntry> currentState;
|
||||||
private Result result;
|
private Result result;
|
||||||
|
@ -92,10 +102,7 @@ public class DiskBalancerWorkStatus {
|
||||||
this.result = result;
|
this.result = result;
|
||||||
this.planID = planID;
|
this.planID = planID;
|
||||||
this.planFile = planFile;
|
this.planFile = planFile;
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
this.currentState = READER_WORKENTRY.readValue(currentState);
|
||||||
this.currentState = mapper.readValue(currentState,
|
|
||||||
defaultInstance().constructCollectionType(
|
|
||||||
List.class, DiskBalancerWorkEntry.class));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -141,15 +148,11 @@ public class DiskBalancerWorkStatus {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
**/
|
**/
|
||||||
public String currentStateString() throws IOException {
|
public String currentStateString() throws IOException {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return MAPPER_WITH_INDENT_OUTPUT.writeValueAsString(currentState);
|
||||||
mapper.enable(SerializationConfig.Feature.INDENT_OUTPUT);
|
|
||||||
return mapper.writeValueAsString(currentState);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toJsonString() throws IOException {
|
public String toJsonString() throws IOException {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return MAPPER.writeValueAsString(this);
|
||||||
return mapper.writeValueAsString(this);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -160,8 +163,7 @@ public class DiskBalancerWorkStatus {
|
||||||
*/
|
*/
|
||||||
public static DiskBalancerWorkStatus parseJson(String json) throws
|
public static DiskBalancerWorkStatus parseJson(String json) throws
|
||||||
IOException {
|
IOException {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return READER_WORKSTATUS.readValue(json);
|
||||||
return mapper.readValue(json, DiskBalancerWorkStatus.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
|
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
||||||
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -263,8 +263,7 @@ public class DiskBalancer {
|
||||||
for (Map.Entry<String, FsVolumeSpi> entry : volMap.entrySet()) {
|
for (Map.Entry<String, FsVolumeSpi> entry : volMap.entrySet()) {
|
||||||
pathMap.put(entry.getKey(), entry.getValue().getBasePath());
|
pathMap.put(entry.getKey(), entry.getValue().getBasePath());
|
||||||
}
|
}
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return JsonUtil.toJsonString(pathMap);
|
||||||
return mapper.writeValueAsString(pathMap);
|
|
||||||
} catch (DiskBalancerException ex) {
|
} catch (DiskBalancerException ex) {
|
||||||
throw ex;
|
throw ex;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.tools.DiskBalancer;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.map.ObjectReader;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -69,6 +70,8 @@ import java.util.TreeSet;
|
||||||
* Common interface for command handling.
|
* Common interface for command handling.
|
||||||
*/
|
*/
|
||||||
public abstract class Command extends Configured {
|
public abstract class Command extends Configured {
|
||||||
|
private static final ObjectReader READER =
|
||||||
|
new ObjectMapper().reader(HashMap.class);
|
||||||
static final Logger LOG = LoggerFactory.getLogger(Command.class);
|
static final Logger LOG = LoggerFactory.getLogger(Command.class);
|
||||||
private Map<String, String> validArgs = new HashMap<>();
|
private Map<String, String> validArgs = new HashMap<>();
|
||||||
private URI clusterURI;
|
private URI clusterURI;
|
||||||
|
@ -441,11 +444,10 @@ public abstract class Command extends Configured {
|
||||||
ClientDatanodeProtocol dnClient = getDataNodeProxy(dnAddress);
|
ClientDatanodeProtocol dnClient = getDataNodeProxy(dnAddress);
|
||||||
String volumeNameJson = dnClient.getDiskBalancerSetting(
|
String volumeNameJson = dnClient.getDiskBalancerSetting(
|
||||||
DiskBalancerConstants.DISKBALANCER_VOLUME_NAME);
|
DiskBalancerConstants.DISKBALANCER_VOLUME_NAME);
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Map<String, String> volumeMap =
|
Map<String, String> volumeMap =
|
||||||
mapper.readValue(volumeNameJson, HashMap.class);
|
READER.readValue(volumeNameJson);
|
||||||
for (DiskBalancerVolumeSet set : node.getVolumeSets().values()) {
|
for (DiskBalancerVolumeSet set : node.getVolumeSets().values()) {
|
||||||
for (DiskBalancerVolume vol : set.getVolumes()) {
|
for (DiskBalancerVolume vol : set.getVolumes()) {
|
||||||
if (volumeMap.containsKey(vol.getUuid())) {
|
if (volumeMap.containsKey(vol.getUuid())) {
|
||||||
|
|
|
@ -18,12 +18,14 @@
|
||||||
package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
|
package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
|
||||||
.DiskBalancerDataNode;
|
.DiskBalancerDataNode;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.map.ObjectReader;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -35,6 +37,8 @@ import java.util.List;
|
||||||
public class JsonNodeConnector implements ClusterConnector {
|
public class JsonNodeConnector implements ClusterConnector {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(JsonNodeConnector.class);
|
LoggerFactory.getLogger(JsonNodeConnector.class);
|
||||||
|
private static final ObjectReader READER =
|
||||||
|
new ObjectMapper().reader(DiskBalancerCluster.class);
|
||||||
private final URL clusterURI;
|
private final URL clusterURI;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -56,9 +60,7 @@ public class JsonNodeConnector implements ClusterConnector {
|
||||||
Preconditions.checkNotNull(this.clusterURI);
|
Preconditions.checkNotNull(this.clusterURI);
|
||||||
String dataFilePath = this.clusterURI.getPath();
|
String dataFilePath = this.clusterURI.getPath();
|
||||||
LOG.info("Reading cluster info from file : " + dataFilePath);
|
LOG.info("Reading cluster info from file : " + dataFilePath);
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
DiskBalancerCluster cluster = READER.readValue(new File(dataFilePath));
|
||||||
DiskBalancerCluster cluster =
|
|
||||||
mapper.readValue(new File(dataFilePath), DiskBalancerCluster.class);
|
|
||||||
String message = String.format("Found %d node(s)",
|
String message = String.format("Found %d node(s)",
|
||||||
cluster.getNodes().size());
|
cluster.getNodes().size());
|
||||||
LOG.info(message);
|
LOG.info(message);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
|
package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -25,9 +26,11 @@ import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Planner;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Planner;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.PlannerFactory;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.PlannerFactory;
|
||||||
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||||
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.map.ObjectReader;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -69,6 +72,8 @@ public class DiskBalancerCluster {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(DiskBalancerCluster.class);
|
LoggerFactory.getLogger(DiskBalancerCluster.class);
|
||||||
|
private static final ObjectReader READER =
|
||||||
|
new ObjectMapper().reader(DiskBalancerCluster.class);
|
||||||
private final Set<String> exclusionList;
|
private final Set<String> exclusionList;
|
||||||
private final Set<String> inclusionList;
|
private final Set<String> inclusionList;
|
||||||
private ClusterConnector clusterConnector;
|
private ClusterConnector clusterConnector;
|
||||||
|
@ -118,8 +123,7 @@ public class DiskBalancerCluster {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static DiskBalancerCluster parseJson(String json) throws IOException {
|
public static DiskBalancerCluster parseJson(String json) throws IOException {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return READER.readValue(json);
|
||||||
return mapper.readValue(json, DiskBalancerCluster.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -232,8 +236,7 @@ public class DiskBalancerCluster {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public String toJson() throws IOException {
|
public String toJson() throws IOException {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return JsonUtil.toJsonString(this);
|
||||||
return mapper.writeValueAsString(this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -19,9 +19,11 @@ package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.codehaus.jackson.annotate.JsonIgnore;
|
import org.codehaus.jackson.annotate.JsonIgnore;
|
||||||
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.codehaus.jackson.map.ObjectReader;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -30,6 +32,9 @@ import java.io.IOException;
|
||||||
*/
|
*/
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
public class DiskBalancerVolume {
|
public class DiskBalancerVolume {
|
||||||
|
private static final ObjectReader READER =
|
||||||
|
new ObjectMapper().reader(DiskBalancerVolume.class);
|
||||||
|
|
||||||
private String path;
|
private String path;
|
||||||
private long capacity;
|
private long capacity;
|
||||||
private String storageType;
|
private String storageType;
|
||||||
|
@ -58,8 +63,7 @@ public class DiskBalancerVolume {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static DiskBalancerVolume parseJson(String json) throws IOException {
|
public static DiskBalancerVolume parseJson(String json) throws IOException {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return READER.readValue(json);
|
||||||
return mapper.readValue(json, DiskBalancerVolume.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -305,8 +309,7 @@ public class DiskBalancerVolume {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public String toJson() throws IOException {
|
public String toJson() throws IOException {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
return JsonUtil.toJsonString(this);
|
||||||
return mapper.writeValueAsString(this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue