HDFS

Francio PKU_CCME

摘要

本实验实现了HDFS上文件的一般性读写删操作,并对其效率进行了分析,结果表明一般性的操作会导致小文件量大时读写效率急剧降低。在此基础上,我们测试了使用 SequenceFile 技术实现的文件读写操作,获得了相关数据并与一般性操作的数据进行了比较,验证了该技术在处理大量小文件时优秀的性能。在文末列举了HDFS其他一些处理大量小文件的常用方法。

实验环境

虚拟机 处理器 内存 硬盘 操作系统 Hadoop 版本
Master Inter(R) Core(TM) i7-8750H @ 2.20GHz 3 GB 40 GB ubuntu-18.04.4-desktop-amd64 2.7.7
Slave1 Inter(R) Core(TM) i7-8750H @ 2.20GHz 2 GB 40 GB ubuntu-18.04.4-desktop-amd64 2.7.7
Slave2 Inter(R) Core(TM) i7-8750H @ 2.20GHz 2 GB 40 GB ubuntu-18.04.4-desktop-amd64 2.7.7

实验步骤

文件的初级读写方法实现

此步实验的目的为用最基本的读写文件的方法检验 HDFS 的读写性能,并与后续改进的读写方法进行对比。

程序入口

此步骤中共设置三个可调参数file_num round_numdata_num,分别代表创建并读取的文件个数、单个文件的大小与收集到的平行数据的组数。

配置文件属性的部分较教学网上的示例代码多了两行,分别为conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enabled", true);,目的为规避DataNode数量过少或单个DataNode被剔除导致的异常。

读写性能主要通过运行耗费的时间体现,通过System.currentTimeMillis();获得运行前后的时间点,相减即为运行所耗费时间。定义两个长整形变量createTimeTotal deleteTimeTotal记录多次创建和删除操作的总时长,最终输出结果为数据的平均值。

测试分为四部分进行,具体分组见代码中的注释。每次程序只运行编号为 1-4 的四段代码中的一段,按由1到4的顺序执行。其中第2步和第3步为测试读文件所需时间,在主程序输出总时间外还会输出更详细的时间数据,详细代码参见本小结的“读取文件”部分。

public static void main(String[] args) {
        try {
            int file_num = 500; //10, 50, 100, 500, 1000,2000
        	int round_num = 1;//10, 50, 100, 500, 1000,2000
        	int data_num = 5;
        	
        	long createTimeTotal = 0l;
        	long deleteTimeTotal = 0l;
        	
        	Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://Master:9000");
            conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
            conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
            conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enabled", true);
            FileSystem fs = FileSystem.get(conf);

			for (int k = 0;k<data_num;k++) {
				Long totalStartTime1 = System.currentTimeMillis();
				createFile(fs, file_num, round_num);
				Long totalEndTime1 = System.currentTimeMillis();
				createTimeTotal += (totalEndTime1 - totalStartTime1);
				//recordTime(fs, file_num);
				//recordTimeInterval(fs, file_num);
				Long totalStartTime2 = System.currentTimeMillis();
				deleteFiles(fs, file_num);
				Long totalEndTime2 = System.currentTimeMillis();
				deleteTimeTotal += (totalEndTime2 - totalStartTime2);
			}
				System.out.println("creat: " + createTimeTotal/data_num);
				System.out.println("delete: " + deleteTimeTotal/data_num);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

创建文件

文件创建部分设置了两个参数file_numround_num分别用于控制生成的文件个数和文件大小。文件的创建使用FileSystem类下的creat()方法,文件内容使用字节流的形式,利用write()方法写入。

public static void createFile(FileSystem fs, int file_num, int round_num) throws Exception {
		for(int i = 0; i < file_num; i++) {
			String file_name = "test_" + i;
			Path file_path = new Path(file_name);
			byte[] buff = file_name.getBytes();
			FSDataOutputStream os = fs.create(file_path);
			os.write(buff,0,buff.length);
			for(int j = 0; j < round_num; j++){
				String round = "round_" + j;
				byte[] buff2 = round.getBytes();
				os.write(buff2,0,buff2.length);
			}
			os.close();
 			System.out.println("Create: " + file_path.getName());
 			
 			FileStatus fileStatus = fs.getFileLinkStatus(file_path);
 			System.out.println("BlockSize: " + fileStatus.getBlockSize());
		}
    System.out.println("Create: Completed" );
	}

在文件创建过程中使用FileStatus类获取文件的属性,进而获得创建的文件所占的 BlockSize 大小。为应对初期可能出现的生成文件的异常,在生成文件时设置了一个System.out.println("Create: " + file_path.getName());以实时输出文件生成过程。在实际测试时应将本行注释掉以方便查找输出的实验结果,并提高运行速度。

读取文件

本步骤中使用缓冲字符流的方式打开并读取文件,使用readline()方式获取文件内容并输出。

public static void readFile(FileSystem fs, Path file_path) throws Exception{
		FSDataInputStream in = fs.open(file_path);
		BufferedReader d = new BufferedReader(new InputStreamReader(in));
		String content = d.readLine();
		System.out.println();
		d.close();
		in.close();
	}

使用与main方法中一致的获取时间的方法,获取遍历读取全部文件的时间:

public static void recordTime(FileSystem fs, int file_num) throws Exception{
		String fileName = "test_";
		Long startTime = System.currentTimeMillis();
		for (int i = 0; i < file_num; i++) {
			String realName = fileName  + i;
			Path file_path = new Path(realName);
			readFile(fs, file_path);
		}
		Long endTime = System.currentTimeMillis();
		System.out.println(endTime - startTime);
	}

在收集上述总读取时间的数据过程中,出现了随着读取文件数量的增加,单个文件的平均读取速度加快的现象,与预期实验结果不符,于是又增添了如下的一个方法,获得读取单个文件的耗时,一次性读取2000个文件进行测试。为方便大量结果的收集,规避因输出导致的运行速度拖慢,我们引入了收集结果的数组,最后使用Arrays.toString()方法将数组一并输出。

为规避数组越界异常,收集结果的数组定义的容量较数据量多一位,在处理数据时应将最后一位消除。

public static void recordTimeInterval(FileSystem fs, int file_num) throws Exception{
		String fileName = "test_";
		long[] timeList = new long[2001];
		for (int i = 0; i < file_num; i++) {
			String realName = fileName  + i;
			Path file_path = new Path(realName);
			Long startTime = System.currentTimeMillis();
			readFile(fs, file_path);
			Long endTime = System.currentTimeMillis();
			timeList[i] = endTime - startTime;
		}
		System.out.println(Arrays.toString(timeList));
	}

删除文件

删除文件的方法如下:

public static void deleteFiles(FileSystem fs, int file_num) throws Exception{
		for(int i = 0; i < file_num; i++) {
			String file_name = "test_" + i;
			Path file = new Path(file_name);
			fs.delete(file, true);
		}
		System.out.println("Delete Completed");
	}

数据收集

本部分中所有数据均为五次实验的平均值。

生成文件数量(个) 10 50 100 500 1000 2000
耗时(ms) 219 653 1240 4952 8651 16600
删除文件数量(个) 10 50 100 500 1000 2000
耗时(ms) 11 58 62 205 343 602
读取文件数量(个) 10 50 100 500 1000 2000
总用时(ms) 202.6 455.8 711.4 1932.6 2841.6 4540.0
平均单个文件用时(ms) 20.26 9.12 7.11 3.87 2.84 2.27

image-20200713222729737

从图中可以清楚的看到,该方法的读写删操作耗时与文件数量基本上呈线性相关,处理千数量级的文件时已近需要用秒来计时了,对于更大量的数据,显然这种处理方式的时间成本过高,是难以接受的。尤其是在创建文件的操作,单独生成2000个文件就已经消耗了接近17秒时间。

读取单个文件的实测时间

前五次测试所得平均值如下图所示,可以明显地看是个别文件读取速度过慢,使得总的读取时间被严重拖慢。

image-20200713175008675

由于读取单个文件的时间远超100毫秒是异常现象,在设计实验时没有预料到,故我们又加测了五次,后五次实验平均值作图如下:

image-20200713181806778

此图的结果较试验预期较符合。复查前五次实验数据,发现第二次读取操作时个别文件的读取时间高达1900毫秒,这些数据是前五次实验数据中出现异常值的根源。

由这实验可以观察得到如下现象:

  • 读取开始时,第一个文件读取速度远慢于后续文件,通常耗时在140毫秒左右,剩余文件读取速度在0-5毫秒范围内波动。读取开始时速度波动较大,而后趋于稳定。起始文件读取慢解释了前述随着读取文件数量的增加,单个文件的平均读取速度加快的异常现象:随着数据量增加,起始文件造成的速度拖慢在总时间中的占比逐渐减小。
  • 读取过程中偶尔会出现读取慢的数据,可能是由于文件块变化,需要重新寻找文件导致的。
  • Block Size:134217728,折合 128 MB

使用 SequenceFile 实现文件读写

SequenceFile文件是Hadoop用来存储二进制形式[key,value]对的一种平面文件,可以将其看作为一个容器,将所有文件打包到SequenceFile类中可以高效地对小文件进行存储和处理。

程序入口

本小节 main 方法和 delete 方法与前述实验步骤一致,仅对 createFile 与 recordTime 方法进行重写。

创建文件

首先定义 SequenceFile 文件的文件名 “test_file” 。适用对 SequenceFile 进行写操作的方法有很多,一般的 Writable 方法或自定义的Writable 方法均可使用。此处使用 Text 类对其 key 赋值,使用 IntWritable 方法对其 value 赋值。

由于该方法允许使用 append 方法在已有文件末尾追加新内容,故此处利用 append 方法追加新的 [key,value] 对,保证文件生成的连续性。

写入完毕需要使用closeStream()方法关闭文件,之后就可以利用上一小节出现过的getFileLinkStatus()获取到包含 blocksize 等信息的文件的基本属性。

public static void createFiles(FileSystem fs, int fileNum, Configuration conf) throws Exception{
    	String fileName = "test_file";
		Path file = new Path(fileName);
		
		SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, file, Text.class, IntWritable.class);
	    Text key = new Text();
	    IntWritable value = new IntWritable();
	    
	    for(int i = 1; i <= fileNum; i++) {
	    	String content="test_" + i;
            key.set(content);
            value.set(i);
            System.out.printf("%s\t%s\n", key, value);
            writer.append(key, value);
        }
	    
        IOUtils.closeStream(writer);
    	FileStatus fileStatus = fs.getFileLinkStatus(file);
		System.out.println("BlockSize: " + fileStatus.getBlockSize());
    }

读取文件

获得文件路径后,使用 SequenceFile.Reader 将待读取的文件实例化。由于前面创建的文件中仅包含 key 与 value 信息,可以通过getKeyClass()getValueClass()方法获得。为了将结果输出,还需将得到的 key 与 value 强制转型为 Writable 型。

获取到的 reader 对象中储存着一系列 [key,value] 对,故可以使用next()方法对其进行遍历。将 [key,value] 对输出以监测程序运行情况。

public static void recordTime(FileSystem fs, Configuration conf) throws Exception{
    	 String fileName = "test_file";
    	 Path file = new Path(fileName); 
    	 
    	 SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
         Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
         
         Long startTime =  System.currentTimeMillis();
         
         while (reader.next(key, value)) {
             System.out.printf("%s\t%s\n", key, value);
         }
         
         Long endTime =  System.currentTimeMillis();
         IOUtils.closeStream(reader);
         System.out.println(" readTime: " + (endTime-startTime));
		
	}

数据收集

本部分中所有数据均为五次实验的平均值。

生成文件数量(个) 10 50 100 500 1000 2000
耗时(ms) 88 91 106 115 130 170
删除文件数量(个) 10 50 100 500 1000 2000
耗时(ms) 11 44 111 249 391 522
读取文件数量(个) 10 50 100 500 1000 2000
总用时(ms) 7 16 18 24 94 101
平均单个文件用时(ms) 0.7 0.32 0.18 0.048 0.094 0.0505

image-20200713223004444

与前面初等方法的数据图对比可以发现,使用SequenceFile进行数据的读写要高效得多,尤其在生成文件的阶段,由于是连续写入,不需要建立新的索引,生成2000个文件的速度从原先的17秒下降至了不到200毫秒,且随着文件数目的增加,读写的时间成本增加不大,在处理大量文件时仍能保持很快的速度。

较快的读取速度也意味着不会出现前述方法中个别数据读取时间异常长的问题。由于此方法中平均到每个文件的读取时间也出现了随着文件数量的增加而加快的现象,故我们可以推测,该方法也需要在读取起始文件时花费更多的时间,由于读取两千个数据的总时长较短,若再对每个文件进行时长分析会出现大量无意义的结果(时长为0),故省去了这部分数据。

image-20200713223241538

由于未重写删除文件的方法,故测得的删除文件的速度与初等方法差别不大。由于使用 SequenceFile 进行文件创建时所有文件均包含在了一个平面文件中,故可以通过删除该平面文件的方式一次性删除全部文件,提升删除效率。

未实现的测试方法

由于每个文件都会占用 NameNode 约150B 的存储空间,而 NameNode 是唯一的,对其升级较为困难,故可以推测当文件量很大时,会对 NameNode 造成很大的负担,所以分布式系统的性能不止要看数据的读写耗时,还要考虑 NameNode 的负载问题。

使用上述 SequenceFile 方法可以有效提高数据读写效率,但仍需要在 NameNode 存储大量信息。若对数据读写性能要求不高,但文件数量非常大时,NameNode 的高负载便成为了主要问题。此时可以使用二级索引解决问题,即将索引内容打包存储在各 DataNode 的一个特定区域内,在 NameNode 中只存储该索引包的位置信息,NameNode 仅需处理这些索引包,负担大大减少。

结论

HDFS是为处理大型文件设计的分布式文件系统,不适合存储大量小文件。大量的小文件会很快沾满nameNode的内存空间,也会影响NameNode 的寻址时间。但在实际应用中,不可避免地要处理大量的小文件(如每天生成的日志),此时就要求我们将文件数据合并上传,或以 append 方式追加在已有文件的末尾,降低文件的分散程度以降低NameNode的寻址时间。

现在已经出现了若干种基本的解决方案,如TextInputFormat 输入格式,CombineFileInputFormat 输入格式,SequenceFile 技术以及Harballing 技术等,这些技术的核心思想均为将现有的众多小文件通过打包形成大文件,构建较大的分片进行后续处理。

本次实验中对比了SequenceFile 技术与普通方法间的读写性能差异,可以看到处理大量文件时该技术在运行效率上的优化时非常显著的,同时上述 SequenceFile 技术还支持压缩,数据压缩有利于节约磁盘存储空间,也能够减少网络传输时间。

体会

  1. 在《数据结构与算法》课程中学习到一些,例如建立二级索引等方法,在这里都有了实际的应用,之前的课程以算法居多,并没有实地的考虑过数据存储位置、方式对读取产生的影响,感觉是一个将知识融会贯通与实践的过程。
  2. 对于没有学习过ics等计算机课程的同学,由于对于计算机系统、分布式等并不熟悉,所以感觉实习会有一定的难度上手。
  3. 这次实验对物理机的内存要求较高,运行虚拟机的主机原装内存为8GB,扩容至16GB后才勉强可以流畅运行三台虚拟机,此时内存占用已高达85%,主机上再运行其他程序,同步处理数据会偶尔卡死,耽误了很多时间。最后采用了先收集原始数据再挂起虚拟机处理数据的步骤完成了这次实习。