Hadoop的MapReduce分布式计算框架

Hadoop的MapReduce分布式计算框架

发布时间:2024-10-12 01:11:44

为您详细解释hadoop的mapreduce分布式计算框架,以及相关的故障排查步骤。mapreduce是hadoop生态系统中的核心组件之一,用于大规模数据处理。让我们深入探讨它的工作原理、实现步骤和常见问题的解决方法。

  1. mapreduce工作原理

mapreduce是一个分为两个主要阶段的编程模型:map阶段和reduce阶段。

1.1 map阶段:

  • 输入数据被分割成独立的块,由map任务并行处理
  • 每个map任务处理一个输入split,产生中间键值对

1.2 shuffle和sort阶段:

  • map输出按key排序
  • 相同key的数据被分组,准备发送给reducer

1.3 reduce阶段:

  • reduce任务处理来自map阶段的中间结果
  • 通常对值进行聚合操作
  • 产生最终输出

graph td
a[input] --> b[split]
b --> c[map]
c --> d[shuffle]
d --> e[sort]
e --> f[reduce]
f --> g[output]
style c fill:#f9f,stroke:#333,stroke-width:2px
style f fill:#f9f,stroke:#333,stroke-width:2px

  1. mapreduce实现步骤

2.1 编写map函数:

java

public static class map extends mapper {
private final static intwritable one = new intwritable(1);
private text word = new text();

public void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
string line = value.tostring();
stringtokenizer tokenizer = new stringtokenizer(line);
while (tokenizer.hasmoretokens()) {
word.set(tokenizer.nexttoken());
context.write(word, one);
}
}
}

2.2 编写reduce函数:

java

public static class reduce extends reducer {
public void reduce(text key, iterable values, context context)
throws ioexception, interruptedexception {
int sum = 0;
for (intwritable val : values) {
sum += val.get();
}
context.write(key, new intwritable(sum));
}
}

2.3 配置job:

java

public static void main(string[] args) throws exception {
configuration conf = new configuration();
job job = job.getinstance(conf, "word count");
job.setjarbyclass(wordcount.class);
job.setmapperclass(map.class);
job.setcombinerclass(reduce.class);
job.setreducerclass(reduce.class);
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(intwritable.class);
fileinputformat.addinputpath(job, new path(args[0]));
fileoutputformat.setoutputpath(job, new path(args[1]));
system.exit(job.waitforcompletion(true) ? 0 : 1);
}

  1. mapreduce优化技巧

3.1 使用combiner: combiner在map端进行局部聚合,减少网络传输:

java
job.setcombinerclass(reduce.class);

3.2 自定义分区器: 控制key到reducer的映射:

java

public static class custompartitioner extends partitioner {
@override
public int getpartition(text key, intwritable value, int numpartitions) {
return (key.hashcode() & integer.max_value) % numpartitions;
}
}

job.setpartitionerclass(custompartitioner.class);

3.3 设置适当的reducer数量:

java
job.setnumreducetasks(10);
  1. mapreduce故障排查

4.1 查看作业状态: 使用hadoop web ui或命令行:

bash
hadoop job -status <job_id>

4.2 检查作业日志: 在jobtracker web ui或hdfs中查看日志:

bash
yarn logs -applicationid <application_id>

4.3 常见问题及解决方法:

a) 数据倾斜:

  • 症状: 某些reducer运行时间远长于其他
  • 解决: 优化key设计,使用自定义分区器

b) out of memory错误:

  • 症状: java堆空间不足
  • 解决: 增加mapper/reducer内存
xml


mapreduce.map.memory.mb
2048


mapreduce.reduce.memory.mb
3072

c) 慢任务:

  • 症状: 某些任务执行时间过长
  • 解决: 使用推测执行
xml


mapreduce.map.speculative
true


mapreduce.reduce.speculative
true

  1. 高级mapreduce特性

5.1 二次排序: 实现自定义writablecomparable来控制reduce阶段的排序:

java

public class compositekey implements writablecomparable {
private text firstkey;
private intwritable secondkey;

// 构造函数, getter, setter 等

@override
public int compareto(compositekey o) {
int cmp = this.firstkey.compareto(o.firstkey);
if (cmp != 0) {
return cmp;
}
return this.secondkey.compareto(o.secondkey);
}

// write 和 readfields 方法实现
}

5.2 链式mapreduce: 多个mapreduce作业串联执行:

java

job job1 = job.getinstance(conf, "job 1");
// 配置job1

job job2 = job.getinstance(conf, "job 2");
// 配置job2

job2.adddependingjob(job1);

jobcontrol jobcontrol = new jobcontrol("jobchain");
jobcontrol.addjob(job1);
jobcontrol.addjob(job2);

thread jobcontrolthread = new thread(jobcontrol);
jobcontrolthread.start();

while (!jobcontrol.allfinished()) {
thread.sleep(500);
}

5.3 计数器使用: 在map和reduce中使用自定义计数器:

java

public static enum customcounter {
malformed_records,
processed_records
}

// 在map或reduce方法中
context.getcounter(customcounter.processed_records).increment(1);

  1. mapreduce性能调优

6.1 inputformat选择: 根据数据特性选择合适的inputformat,如textinputformat, sequencefileinputformat等。

6.2 压缩: 使用压缩减少i/o:

java

fileoutputformat.setcompressoutput(job, true);
fileoutputformat.setoutputcompressorclass(job, gzipcodec.class);

6.3 jvm重用: 对于小作业,可以重用jvm:

xml


mapreduce.job.jvm.numtasks
-1

这些详细的步骤和高级特性应该能帮助您更深入地理解和应用hadoop mapreduce框架。

感谢提供:05互联