文集文档索引

Hadoop


  • 文集信息
  • 目录大纲
  • 最新文档
  • 知识宇宙

文集详情

文集导读

Hadoop Hadoop 章节详解:原理、实践与代码示例 Hadoop 概述:大数据时代的基石 在信息爆炸的时代,数据量呈指数级增长,传统的数据处理方式已经无法满足海量数据的存储和分析需求。Hadoop 正是在这样的背景下应运而生,它是一个开源的分布式计算框架,专为处理大规模数据集而设计。Hadoop 能够将庞大的数据集分发到由普通服务器组成的集群中进行并行处理,从而实现高效、可靠、经济的大数据分析。 1.1 大数据的挑战与 Hadoop 的价值 传统数据处理面临的挑战主要集中在以下两个方面: 海量数据存储: 单个服务器的存储容量有限,无法容纳 PB 甚至 EB 级别的数据。 海量数据计算: 单台服务器的计算能力不足,无法在可接受的时间内完成对海量数据的分析。 Hadoop 的出现完美地解决了这些挑战: 分布式存储 (HDFS): Hadoop Distributed File System (HDFS) 将数据分散存储在集群中多台服务器上,实现了横向扩展的存储能力,可以轻松应对海量数据的存储需求。 分布式计算 (MapReduce & YARN): Hadoop MapReduce 编程模型将计算任务分解成多个子任务,并行运行在集群中的不同服务器上,大幅提升了数据处理速度。

Hadoop

Hadoop 章节详解:原理、实践与代码示例

1. Hadoop 概述:大数据时代的基石

在信息爆炸的时代,数据量呈指数级增长,传统的数据处理方式已经无法满足海量数据的存储和分析需求。Hadoop 正是在这样的背景下应运而生,它是一个开源的分布式计算框架,专为处理大规模数据集而设计。Hadoop 能够将庞大的数据集分发到由普通服务器组成的集群中进行并行处理,从而实现高效、可靠、经济的大数据分析。

1.1 大数据的挑战与 Hadoop 的价值

传统数据处理面临的挑战主要集中在以下两个方面:

  • 海量数据存储: 单个服务器的存储容量有限,无法容纳 PB 甚至 EB 级别的数据。

  • 海量数据计算: 单台服务器的计算能力不足,无法在可接受的时间内完成对海量数据的分析。

Hadoop 的出现完美地解决了这些挑战:

  • 分布式存储 (HDFS): Hadoop Distributed File System (HDFS) 将数据分散存储在集群中多台服务器上,实现了横向扩展的存储能力,可以轻松应对海量数据的存储需求。

  • 分布式计算 (MapReduce & YARN): Hadoop MapReduce 编程模型将计算任务分解成多个子任务,并行运行在集群中的不同服务器上,大幅提升了数据处理速度。YARN (Yet Another Resource Negotiator) 作为 Hadoop 的资源管理系统,负责集群资源的统一调度和管理,使得多种计算框架可以运行在 Hadoop 集群之上,进一步增强了 Hadoop 的灵活性和扩展性。

1.2 Hadoop 的核心组件

Hadoop 主要由以下三个核心组件构成:

  • HDFS (Hadoop Distributed File System): 分布式文件系统,负责海量数据的可靠存储。

  • MapReduce: 分布式计算框架,提供数据并行处理的编程模型。

  • YARN (Yet Another Resource Negotiator): 集群资源管理系统,负责集群资源的统一调度和管理。

除了核心组件外,Hadoop 生态系统还包含众多相关的工具和框架,例如:

  • Hive: 基于 Hadoop 的数据仓库工具,提供 SQL-like 查询语言 (HiveQL) 来分析存储在 Hadoop 中的数据。

  • Pig: 高级数据流语言和执行框架,用于快速开发和执行复杂的数据转换和分析任务。

  • HBase: NoSQL 数据库,提供高可靠、高性能、列式存储的分布式数据库服务。

  • Spark: 快速通用的大数据处理引擎,相比 MapReduce,Spark 在内存计算和迭代计算方面具有优势。

  • ZooKeeper: 分布式协调服务,用于管理和协调分布式应用程序。

1.3 Hadoop 的优势

Hadoop 作为大数据处理领域的基石,具有以下显著优势:

  • 高可靠性: HDFS 通过数据冗余备份机制,保证数据的高可靠性和容错性。即使部分节点发生故障,数据也不会丢失。

  • 高扩展性: Hadoop 可以通过简单地增加服务器节点来线性扩展集群的存储和计算能力,轻松应对数据规模的增长。

  • 高效性: MapReduce 编程模型将计算任务并行化处理,充分利用集群的计算资源,大幅提升数据处理效率。

  • 经济性: Hadoop 可以运行在廉价的普通服务器上,降低了构建大数据平台的成本。

  • 开源性: Hadoop 是开源项目,拥有庞大的社区支持,用户可以免费使用和修改。

2. HDFS 详解:分布式文件系统的基石

HDFS (Hadoop Distributed File System) 是 Hadoop 的核心组件之一,是一个高度容错性的分布式文件系统,被设计用于在廉价的硬件上运行。HDFS 提供了高吞吐量的数据访问,非常适合大规模数据集的应用。

2.1 HDFS 架构

HDFS 采用主从架构 (Master-Slave),主要由以下几个组件构成:

  • NameNode (NN): 主节点,负责管理文件系统的命名空间和元数据信息,例如文件目录结构、文件和块的映射关系、访问权限等。NameNode 存储元数据信息在内存中,并持久化到磁盘。

  • DataNode (DN): 从节点,负责存储实际的数据块。DataNode 接收来自客户端或 NameNode 的指令,执行数据块的读写操作。每个数据块会被复制多份 (默认 3 份) 存储在不同的 DataNode 上,以保证数据的高可靠性。

  • Secondary NameNode (SNN): 辅助 NameNode,并非 NameNode 的备份,而是辅助 NameNode 进行元数据管理的。SNN 定期从 NameNode 获取编辑日志 (EditLog) 和文件系统镜像 (FsImage),合并成新的文件系统镜像,并将其同步给 NameNode,从而减轻 NameNode 的负担,并提升系统性能。

2.2 HDFS 工作原理

2.2.1 数据写入流程

  1. 客户端 (Client) 向 NameNode 发起写请求。

  2. NameNode 检查目标文件是否存在,以及客户端是否有权限进行写操作。

  3. NameNode 返回允许写入,并告知客户端可用的 DataNode 列表。

  4. 客户端将数据分成多个数据块 (Block)。

  5. 客户端将数据块写入到 DataNode 管道 (Pipeline) 中。 数据块会依次写入到 DataNode 1,DataNode 1 复制到 DataNode 2,DataNode 2 复制到 DataNode 3,形成一个复制管道。

  6. DataNode 接收到数据块后,将其存储到本地磁盘,并向 NameNode 发送写入成功的报告。

  7. NameNode 接收到所有 DataNode 的报告后,更新元数据信息。

  8. 客户端接收到写入成功的响应。

2.2.2 数据读取流程

  1. 客户端 (Client) 向 NameNode 发起读请求。

  2. NameNode 检查目标文件是否存在,以及客户端是否有权限进行读操作。

  3. NameNode 返回允许读取,并告知客户端目标数据块所在的 DataNode 列表。

  4. 客户端选择就近的 DataNode 读取数据块。

  5. DataNode 将数据块返回给客户端。

  6. 客户端将读取到的数据块组装成完整的文件。

2.3 HDFS 代码实践 (Java API)

以下代码示例演示了使用 Hadoop Java API 操作 HDFS 的基本操作,例如创建目录、上传文件、下载文件、删除文件等。

pom.xml (添加 Hadoop 依赖)

<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.1</version> <!-- 请根据实际 Hadoop 版本修改 --> </dependency> </dependencies>

HDFS 操作示例代码 (Java)

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; public class HDFSDemo { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); // 配置 HDFS 地址,如果 Hadoop 集群配置了 core-site.xml,可以省略 // conf.set("fs.defaultFS", "hdfs://your-namenode-host:9000"); FileSystem fs = FileSystem.get(conf); // 获取 FileSystem 对象,默认连接本地 Hadoop 集群 // 如果需要连接指定 HDFS,可以使用以下方式 // FileSystem fs = FileSystem.get(URI.create("hdfs://your-namenode-host:9000"), conf); // 1. 创建目录 Path dirPath = new Path("/test-dir"); boolean mkdirSuccess = fs.mkdirs(dirPath); System.out.println("Create directory " + dirPath + ": " + mkdirSuccess); // 2. 上传文件 Path localFilePath = new Path("local-file.txt"); // 本地文件路径 Path hdfsFilePath = new Path(dirPath, "uploaded-file.txt"); // HDFS 目标路径 try (InputStream in = System.in; // 这里为了演示方便,从标准输入读取,实际应用中替换为 FileInputStream OutputStream out = fs.create(hdfsFilePath)) { IOUtils.copyBytes(in, out, conf); System.out.println("Upload file " + localFilePath + " to " + hdfsFilePath + " success."); } // 3. 下载文件 Path downloadHdfsFilePath = hdfsFilePath; Path localDownloadPath = new Path("downloaded-file.txt"); try (InputStream in = fs.open(downloadHdfsFilePath); OutputStream out = System.out) { // 这里为了演示方便,输出到标准输出,实际应用中替换为 FileOutputStream IOUtils.copyBytes(in, out, conf); System.out.println("Download file " + downloadHdfsFilePath + " to " + localDownloadPath + " success."); } // 4. 删除文件或目录 Path deletePath = dirPath; // 可以删除目录或文件 boolean deleteSuccess = fs.delete(deletePath, true); // true 表示递归删除目录 System.out.println("Delete path " + deletePath + ": " + deleteSuccess); fs.close(); // 关闭 FileSystem } }

代码详解:

  • Configuration conf = new Configuration();: 创建 Hadoop 配置对象,用于加载 Hadoop 配置文件 (core-site.xml, hdfs-site.xml 等)。

  • FileSystem fs = FileSystem.get(conf);: 获取 FileSystem 对象,用于操作 HDFS。默认会连接本地 Hadoop 集群。如果需要连接指定的 HDFS 集群,可以使用 FileSystem.get(URI.create("hdfs://your-namenode-host:9000"), conf);

  • fs.mkdirs(dirPath);: 创建 HDFS 目录。

  • fs.create(hdfsFilePath);: 创建 HDFS 文件,并返回 OutputStream 用于写入数据。

  • fs.open(downloadHdfsFilePath);: 打开 HDFS 文件,并返回 InputStream 用于读取数据。

  • IOUtils.copyBytes(in, out, conf);: Hadoop 提供的工具类,用于高效地进行数据流的复制。

  • fs.delete(deletePath, true);: 删除 HDFS 文件或目录。第二个参数 true 表示递归删除目录及其子目录和文件。

  • fs.close();: 关闭 FileSystem 连接。

运行代码:

  1. 确保你的开发环境配置了 Hadoop 客户端,并且可以连接到 Hadoop 集群。

  2. 编译并运行 Java 代码。

  3. 根据代码提示,输入要上传到 HDFS 的内容。

  4. 代码将执行创建目录、上传文件、下载文件、删除目录等操作,并在控制台输出结果。

注意: 在实际应用中,需要根据具体的 Hadoop 集群配置修改代码中的 HDFS 地址,以及替换示例代码中标准输入输出为实际的文件输入输出流。

3. MapReduce 详解:分布式计算的引擎

MapReduce 是 Hadoop 的核心组件之一,是一个用于处理大规模数据集的编程模型和计算框架。MapReduce 将复杂的分布式计算过程抽象成两个主要的阶段:Map 和 Reduce,使得开发者可以专注于业务逻辑的实现,而无需过多关注底层分布式细节。

3.1 MapReduce 工作原理

MapReduce 的核心思想是 "分而治之"。它将输入数据切分成多个小的数据块 (Input Splits),分配给集群中的多个 Map 任务并行处理,Map 任务的输出结果经过 Shuffle 和 Sort 阶段,最终由 Reduce 任务进行汇总和处理,得到最终的结果。

3.1.1 Map 阶段

  • Input Splits: MapReduce 框架将输入数据分割成多个独立的 Input Splits,每个 Input Split 通常对应一个 Map 任务。

  • Map Task: 每个 Map 任务读取一个 Input Split 的数据,并根据用户自定义的 Map 函数进行处理。Map 函数接收 Key-Value 对作为输入,并输出一组新的 Key-Value 对作为中间结果。

  • Output: Map 任务的输出结果 (Intermediate Data) 会被写入本地磁盘。

3.1.2 Shuffle & Sort 阶段

  • Shuffle: Shuffle 阶段是 MapReduce 的核心阶段,负责将 Map 任务输出的中间结果,按照 Key 进行分区 (Partition) 和排序 (Sort),并将相同 Key 的数据汇集到同一个 Reduce 任务。

  • Partition: 决定 Map 任务输出的 Key-Value 对由哪个 Reduce 任务处理。默认的分区策略是 HashPartitioner,根据 Key 的 Hash 值对 Reduce 任务数量取模来决定分区。用户也可以自定义分区策略。

  • Sort: 对每个分区内的数据按照 Key 进行排序。

  • Spill: 当 Map 任务的中间结果数据量超过一定阈值时,会溢写 (Spill) 到磁盘,并在磁盘上进行归并排序 (Merge Sort)。

  • Merge: 当所有 Map 任务完成后,Shuffle 阶段会将所有溢写到磁盘的中间结果文件进行归并 (Merge),形成最终的输入给 Reduce 任务的 Partitioned & Sorted Data。

3.1.3 Reduce 阶段

  • Reduce Task: 每个 Reduce 任务接收一个或多个分区的数据 (Partitioned & Sorted Data),并根据用户自定义的 Reduce 函数进行处理。Reduce 函数接收相同 Key 的一组 Value 集合作为输入,并输出最终的结果。

  • Output: Reduce 任务的输出结果 (Output Data) 会被写入到 HDFS。

3.2 MapReduce 代码实践 (Java API - WordCount)

以下代码示例演示了经典的 WordCount MapReduce 程序,统计输入文本文件中每个单词出现的次数。

WordCount Mapper (WordMapper.java)

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); // 输出 (word, 1) 键值对 } } }

WordCount Reducer (WordReducer.java)

import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; Iterator<IntWritable> iterator = values.iterator(); while (iterator.hasNext()) { sum += iterator.next().get(); // 累加相同单词的计数 } result.set(sum); context.write(key, result); // 输出 (word, count) 键值对 } }

WordCount Driver (WordCount.java)

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: WordCount <input path> <output path>"); System.exit(-1); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); // 创建 Job 对象 job.setJarByClass(WordCount.class); // 设置 Job 驱动类 job.setMapperClass(WordMapper.class); // 设置 Mapper 类 job.setReducerClass(WordReducer.class); // 设置 Reducer 类 job.setOutputKeyClass(Text.class); // 设置输出 Key 类型 job.setOutputValueClass(IntWritable.class); // 设置输出 Value 类型 FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置输出路径 boolean success = job.waitForCompletion(true); // 提交 Job 并等待完成 System.exit(success ? 0 : 1); } }

代码详解:

  • WordMapper:

    • 继承 Mapper<LongWritable, Text, Text, IntWritable>,定义 Mapper 的输入 Key 类型 (LongWritable - 行偏移量), 输入 Value 类型 (Text - 行内容), 输出 Key 类型 (Text - 单词), 输出 Value 类型 (IntWritable - 计数)。

    • map() 方法:接收输入 Key-Value 对,将每一行文本分割成单词,并将每个单词作为 Key,计数 1 作为 Value,输出 (word, 1) 键值对。

  • WordReducer:

    • 继承 Reducer<Text, IntWritable, Text, IntWritable>,定义 Reducer 的输入 Key 类型 (Text - 单词), 输入 Value 类型 (IntWritable - 计数), 输出 Key 类型 (Text - 单词), 输出 Value 类型 (IntWritable - 总计数)。

    • reduce() 方法:接收相同单词的 Key 和一组计数 Value 集合,累加计数 Value,并将单词和总计数作为 Key-Value 对输出。

  • WordCount Driver:

    • 创建 ConfigurationJob 对象,配置 Job 的相关参数。

    • job.setJarByClass(WordCount.class);: 设置 Job 的驱动类,方便 Hadoop 框架找到 Mapper 和 Reducer 类。

    • job.setMapperClass(WordMapper.class);job.setReducerClass(WordReducer.class);: 设置 Job 使用的 Mapper 和 Reducer 类。

    • job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);: 设置 Job 的输出 Key 和 Value 类型,需要与 Reducer 的输出类型一致。

    • FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));: 设置 Job 的输入路径和输出路径,从命令行参数获取。

    • job.waitForCompletion(true);: 提交 Job 到 Hadoop 集群并等待 Job 完成。

运行代码:

  1. 确保你的开发环境配置了 Hadoop 客户端,并且可以连接到 Hadoop 集群。

  2. 将 WordMapper.java, WordReducer.java, WordCount.java 编译成 Class 文件,并打包成 JAR 文件 (例如 wordcount.jar)。

  3. 将输入文本文件上传到 HDFS (例如 input.txt)。

  4. 运行 Hadoop 命令提交 MapReduce Job:

    hadoop jar wordcount.jar WordCount hdfs://your-namenode-host:9000/input.txt hdfs://your-namenode-host:9000/output
    • hadoop jar wordcount.jar: 运行 JAR 文件。

    • WordCount: 主类名 (Driver 类)。

    • hdfs://your-namenode-host:9000/input.txt: 输入文件 HDFS 路径。

    • hdfs://your-namenode-host:9000/output: 输出文件 HDFS 路径 (输出目录必须不存在)。

  5. Job 运行完成后,可以在 HDFS 的输出目录下查看结果文件 (part-r-00000),其中包含了每个单词及其出现的次数。

注意: 在实际应用中,需要根据具体的 Hadoop 集群配置修改命令中的 HDFS 地址,以及替换示例代码中的输入输出路径为实际的文件路径。

4. YARN 详解:集群资源管理的调度者

YARN (Yet Another Resource Negotiator) 是 Hadoop 2.0 引入的集群资源管理系统,它将 MapReduce 框架中的资源管理和作业调度功能分离出来,使得 Hadoop 可以支持多种计算框架 (例如 Spark, Tez, Flink 等),而不仅仅是 MapReduce。YARN 提供了更灵活、更高效的集群资源管理和调度机制。

4.1 YARN 架构

YARN 采用主从架构,主要由以下几个组件构成:

  • ResourceManager (RM): 主节点,负责整个集群的资源管理和调度。RM 接收客户端提交的作业,并为作业分配资源 (Container)。RM 还负责监控 NodeManager 的状态,以及作业的运行状态。

  • NodeManager (NM): 从节点,运行在集群中的每台服务器上,负责管理本节点的资源 (CPU, 内存等),并接收 ResourceManager 的指令,启动和监控 Container。

  • ApplicationMaster (AM): 每个应用程序 (例如 MapReduce Job, Spark Application 等) 对应一个 ApplicationMaster。AM 负责应用程序的生命周期管理,向 ResourceManager 申请资源,并将任务调度到 Container 上运行,监控任务的运行状态,并在任务失败时进行重试。

  • Container: YARN 中的资源分配单位,封装了 CPU、内存等资源。每个任务 (例如 Map Task, Reduce Task) 运行在一个 Container 中。

4.2 YARN 工作流程

  1. 客户端 (Client) 向 ResourceManager 提交应用程序 (Application)。

  2. ResourceManager 接收到应用程序提交请求后,从集群中选择一个 NodeManager 启动 ApplicationMaster。

  3. ApplicationMaster 启动后,向 ResourceManager 注册应用程序。

  4. ApplicationMaster 向 ResourceManager 申请资源 (Container)。

  5. ResourceManager 为 ApplicationMaster 分配 Container,并告知 ApplicationMaster 可用的 NodeManager 列表。

  6. ApplicationMaster 连接到 NodeManager,并请求 NodeManager 启动 Container。

  7. NodeManager 启动 Container,并为 Container 分配资源 (CPU, 内存等)。

  8. ApplicationMaster 将任务调度到 Container 上运行。

  9. Container 运行任务,并将任务运行状态汇报给 ApplicationMaster。

  10. ApplicationMaster 监控任务的运行状态,并在任务失败时进行重试。

  11. 应用程序运行完成后,ApplicationMaster 向 ResourceManager 注销应用程序,并释放资源。

4.3 YARN 的优势

  • 多框架支持: YARN 将资源管理和作业调度分离,使得 Hadoop 可以支持多种计算框架,例如 MapReduce, Spark, Tez, Flink 等,提高了 Hadoop 集群的资源利用率和灵活性。

  • 资源隔离: YARN 通过 Container 实现了资源隔离,不同的应用程序运行在独立的 Container 中,互不影响,提高了系统的稳定性和安全性。

  • 高效的资源调度: YARN 的 ResourceManager 采用多种调度策略 (例如 FIFO, Capacity Scheduler, Fair Scheduler 等),可以根据不同的应用场景和资源需求,进行高效的资源调度,提高集群的整体性能。

目录大纲

    最新文档

    知识宇宙

    正在加载知识图谱...


    转发