博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop学习笔记(五):java api 操作hdfs
阅读量:5167 次
发布时间:2019-06-13

本文共 10725 字,大约阅读时间需要 35 分钟。

HDFS的Java访问接口 

  1)org.apache.hadoop.fs.FileSystem

    是一个通用的文件系统API,提供了不同文件系统的统一访问方式。
  2)org.apache.hadoop.fs.Path
    是Hadoop文件系统中统一的文件或目录描述,类似于java.io.File对本地文件系统的文件或目录描述。
  3)org.apache.hadoop.conf.Configuration
    读取、解析配置文件(如core-site.xml/hdfs-default.xml/hdfs-site.xml等),或添加配置的工具类
  4)org.apache.hadoop.fs.FSDataOutputStream
    对Hadoop中数据输出流的统一封装
  5)org.apache.hadoop.fs.FSDataInputStream
    对Hadoop中数据输入流的统一封装

Java访问HDFS主要编程步骤 

    1)构建Configuration对象,读取并解析相关配置文件

    Configuration conf=new Configuration();
  2)设置相关属性
    conf.set("fs.defaultFS","hdfs://1IP:9000");
  3)获取特定文件系统实例fs(以HDFS文件系统实例)
    FileSystem fs=FileSystem.get(new URI("hdfs://IP:9000"),conf,“hdfs");
  4)通过文件系统实例fs进行文件操作(以删除文件实例)
    fs.delete(new Path("/user/liuhl/someWords.txt"));

 

示例代码

1、新建mave项目:hadoop-hdfs-demo。

pom.xml如下:

4.0.0
com.hadoop.demo
hadoop-hdfs-demo
1.0-SNAPSHOT
org.apache.hadoop
hadoop-common
2.8.1
org.apache.hadoop
hadoop-client
2.8.1
org.apache.hadoop
hadoop-hdfs
2.8.1
org.apache.hadoop
hadoop-mapreduce-client-core
2.8.1
org.apache.hadoop
hadoop-auth
2.8.1
log4j
log4j
1.2.17
commons-logging
commons-logging
1.2
org.projectlombok
lombok
1.16.10

 

2、新建连接hadoop的类:ConnectHadoop 

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;public class ConnectHadoop {    public static FileSystem getHadoopFileSystem() {        FileSystem fs = null;        Configuration conf = null;        //此时的conf不需任何设置,只需读取远程的配置文件即可        conf = new Configuration();        // Hadoop的用户名,master机器的登录用户        String hdfsUserName = "root";        URI hdfsUri = null;        try {            // HDFS的访问路径            hdfsUri = new URI("hdfs://192.168.137.100:9000");        } catch (URISyntaxException e) {            e.printStackTrace();        }        try {            // 根据远程的NN节点,获取配置信息,创建HDFS对象            fs = FileSystem.get(hdfsUri,conf,hdfsUserName);        } catch (IOException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }        return fs;    }}
hdfs://192.168.137.100:9000,是master节点下的core-site.xml的配置

 

 

3、测试基本操作类:HadoopHdfsBaseOperation

注意:运行过程中会报以下异常,但是程序可以运行成功,所说是本地要放一个hadoop的二进制包,并且要填写HADOOP_HOME

 

 

import lombok.extern.slf4j.Slf4j;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.IOUtils;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStream;import java.util.Iterator;import java.util.Map;@Slf4jpublic class HadoopHdfsBaseOperation {    public static void main(String[] args){        FileSystem fs = ConnectHadoop.getHadoopFileSystem();        try {            //createDir(fs);            //deleteDir(fs);            //renamePath(fs);            //iteratorPath(fs,new Path("/aa"));            //showAllConf();            //printHdfsFileContent(fs);            //uploadLocalFileToHdfs(fs);            //downloadFileFromHdfs(fs);            copyInHdfs(fs);        }catch (Exception e){            log.error("hdfs error,{}",e);        }finally {            try {                fs.close();            } catch (IOException e) {                e.printStackTrace();            }        }    }    /**     * 创建目录     * @param fs     * @return     */    public static void createDir(FileSystem fs){        boolean b = false;        Path path = new Path("/hzb");        try {            // even the path exist,it can also create the path.            fs.mkdirs(path);            log.info("mkdir success");        } catch (IOException e) {            log.error("mkdir error,{}",e);        }    }    /**     * 删除path,参数true相当于rm -r     * @param fs     * @return     */    public static void deleteDir(FileSystem fs){        boolean b = false;        Path path = new Path("/xxxx/yyyy");        try {            // even the path exist,it can also create the path.            fs.delete(path,true);            log.info("delete dir success");        } catch (IOException e) {            log.error("delete error,{}",e);        }    }    /**     * 删除path,参数true相当于rm -r     * @param fs     * @return     */    public static void renamePath(FileSystem fs){        boolean b = false;        Path oldPath = new Path("/xxxx");        Path newPath = new Path("/zzzz");        try {            // even the path exist,it can also create the path.            fs.rename(oldPath,newPath);            log.info("rename path success");        } catch (IOException e) {            log.error("rename error,{}",e);        }    }    /**     * 遍历文件夹及子文件     * @param hdfs     * @return     */    public static void iteratorPath(FileSystem hdfs,Path listPath){        FileStatus[] files;        try {            files = hdfs.listStatus(listPath);            // 实际上并不是每个文件夹都会有文件的。            if(files.length == 0){                // 如果不使用toUri(),获取的路径带URL。                log.info("==>root dir:{}",listPath.toUri().getPath());            }else {                // 判断是否为文件                for (FileStatus f : files) {                    if (files.length == 0 || f.isFile()) {                        log.info("==>file:{}",f.getPath().toUri().getPath());                    } else {                        // 是文件夹,且非空,就继续遍历                        iteratorPath(hdfs, f.getPath());                    }                }            }        } catch (IOException e) {            log.error("iteratorPath error,{}",e);        }    }    /**     * 读取远程hadoop集群的所有配置文件信息,并以键值对打印出来     */    public static void showAllConf(){        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.137.100:9000");        Iterator
> it = conf.iterator(); log.info("==================================================以下是远程hadoop的配置信息=============="); while(it.hasNext()){ Map.Entry
entry = it.next(); log.info(entry.getKey()+"=" +entry.getValue()); } log.info("==================================================以上是远程hadoop的配置信息=============="); } /** * 将远程hdfs中的test/readme.txt内容读取并打印到console并输出到E */ public static void printHdfsFileContent(FileSystem hdfs){ try { FSDataInputStream is = hdfs.open(new Path("/test/readme.txt")); OutputStream os = new FileOutputStream(new File("E:/hadooptest/readme.txt")); byte[] buff = new byte[1024]; int length = 0; log.info("远程的/test/readme.txt内容如下:=======================》"); while ((length = is.read(buff)) != -1) { System.out.println(new String(buff, 0, length)); os.write(buff, 0, length); os.flush(); } } catch (Exception e){ log.error("printHdfsFileContent error,{}",e); } } /** * 文件上传,将本地的E:/hadooptest/navicat.zip上传到hdfs的/aa * @param hdfs */ public static void uploadLocalFileToHdfs(FileSystem hdfs){ Path HDFSPath = new Path("/aa"); Path localPath = new Path("E:/hadooptest/navicat.zip"); // 如果上传的路径不存在会创建 // 如果该路径文件已存在,就会覆盖 try { hdfs.copyFromLocalFile(localPath,HDFSPath); } catch (IOException e) { e.printStackTrace(); log.error("uploadLocalFileToHdfs error,{}",e); } } /** * 文件下载,将hdfs中/aa/navicat.zip文件下载到E:/hadooptest/,经过测试直接使用hdfs.copyToLocalFile下载不下来,所有用文件流来下载 * @param hdfs */ public static void downloadFileFromHdfs(FileSystem hdfs){// Path HDFSPath = new Path("/aa/navicat.zip");// Path localPath = new Path("E:/hadooptest/");// try {// log.info("====================开始下载=======================");// hdfs.copyToLocalFile(HDFSPath,localPath);// log.info("====================下载结束=======================");// } catch (IOException e) {// e.printStackTrace();// log.error("downloadFileFromHdfs error,{}",e);// } try { FSDataInputStream ifs = hdfs.open(new Path("/aa/navicat.zip")); OutputStream os = new FileOutputStream(new File("E:/hadooptest/navicat.zip")); byte[] buff = new byte[1024]; int length = 0; log.info("============开始下载=======================》"); while ((length = ifs.read(buff)) != -1) { os.write(buff, 0, length); os.flush(); } } catch (Exception e){ log.error("printHdfsFileContent error,{}",e); } } /** * 在hdfs内部之间复制文件 * 使用FSDataInputStream来打开文件open(Path p) * 使用FSDataOutputStream开创建写到的路径create(Path p) * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)来进行具体的读写 * 说明: * 1.java中使用缓冲区来加速读取文件,这里也使用了缓冲区,但是只要指定缓冲区大小即可,不必单独设置一个新的数组来接受 * 2.最后一个布尔值表示是否使用完后关闭读写流。通常是false,如果不手动关会报错的 * @param hdfs */ public static void copyInHdfs(FileSystem hdfs){ Path inPath = new Path("/aa/navicat.zip"); Path outPath = new Path("/test/navicat.zip"); FSDataInputStream hdfsIn = null; FSDataOutputStream hdfsOut = null; try { hdfsIn = hdfs.open(inPath); hdfsOut = hdfs.create(outPath); IOUtils.copyBytes(hdfsIn,hdfsOut,1024*1024*64,false); } catch (IOException e) { log.error("copyInHdfs error,{}",e); } }}

 

转载于:https://www.cnblogs.com/boshen-hzb/p/9097402.html

你可能感兴趣的文章
【Ruby】Ruby在Windows上的安装
查看>>
Objective C 总结(十一):KVC
查看>>
BZOJ 3747 洛谷 3582 [POI2015]Kinoman
查看>>
vue实战(7):完整开发登录页面(一)
查看>>
Visual Studio自定义模板(二)
查看>>
【Mood-20】滴滤咖啡做法 IT工程师加班必备 更健康的coffee 项目经理加班密鉴
查看>>
读《构建之法-软件工程》第四章有感
查看>>
使用 Printf via SWO/SWV 输出调试信息
查看>>
.net 分布式架构之分布式锁实现(转)
查看>>
吴恩达机器学习笔记 —— 3 线性回归回顾
查看>>
Problem E: Automatic Editing
查看>>
SpringBoot 使用 MyBatis 分页插件 PageHelper 进行分页查询
查看>>
《DSP using MATLAB》Problem 6.17
查看>>
微信公众平台开发实战Java版之如何网页授权获取用户基本信息
查看>>
一周TDD小结
查看>>
sizeof与strlen的用法
查看>>
Linux 下常见目录及其功能
查看>>
开源框架中常用的php函数
查看>>
nginx 的提升多个小文件访问的性能模块
查看>>
set&map
查看>>