数据清洗(ETL)案例实操

文章目录

数据清洗(ETL)概述案例需求和分析代码实现和结果分析

数据清洗(ETL)概述

“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库中,但其对象并不限于数据仓库。

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

案例需求和分析

有一个日志数据集,我们要去除日志中字段个数小于等于11的日志。 部分数据集: 我们期望输出的数据每行字段长度都大于11,所以需要在Map阶段对输入的数据根据规则进行过滤清洗。

代码实现和结果分析

package etl;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WebLogDriver {

public static void main(String[] args) throws Exception {

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置

args = new String[] { "D:/input/inputlog.txt", "D:\\hadoop\\output" };

// 1 获取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 2 加载jar包

job.setJarByClass(WebLogDriver.class);

// 3 关联map

job.setMapperClass(WebLogMapper.class);

//4设置map的输出类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(NullWritable.class);

//5 设置最终输出类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

// 6取消reduce阶段,设置reducetask个数为0

job.setNumReduceTasks(0);

// 7 设置输入和输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

//8 提交

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);

}

}

package etl;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WebLogMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//获取一行

String line = value.toString();

//ETL

boolean result = parseLog(line,context);

if (!result){

return;

}

//写出

context.write(value,NullWritable.get());

}

private boolean parseLog(String line, Context context) {

String[] fields = line.split(" ");

if (fields.length>11){

return true;

}else {

return false;

}

}

}

输出结果:

可以发现数据清洗后少了很多行,这就把不符合要求的数据去除掉了。

`