Bootstrap

Hadoop 编程实战:HDFS API 编程样例

想了解HDFS API基础知识,请查阅

本文将介绍HDFS常用API编程样例,具体请看代码。欢迎大家多多指教,哈哈。

package cdh_dev_demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.Date;

public class HdfsMethodDemo {
    public static void main(String args[]) throws IOException {
        HdfsMethodDemo h=new HdfsMethodDemo();
        FileSystem fs = getfs();
        //执行方法
        fs.close();
    }

    //创建HDFS Configuration配置项
    private static Configuration getconf(){
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://namenode.cdh.com:8020");
        return conf;
    }

    //创建HDFS文件对象
    private static FileSystem getfs() throws IOException{
        Configuration conf = getconf();
        FileSystem fs = FileSystem.get(conf);
        return fs;
    }

    //拷贝本地文件到HDFS
    public static void CopyLocalToHdfs(FileSystem fs,String LocalPath,String HdfsPath) throws IOException{
        Path local = new Path(LocalPath);
        Path hdfs = new Path(HdfsPath);
        fs.copyFromLocalFile(local,hdfs);
        //查看文件是否复制成功
        FileStatus files[] = fs.listStatus(hdfs);
        for (FileStatus file:files){
            System.out.println(file.getPath().toString());
        }
    }

    //拷贝HDFS文件到本地
    public static void CopyHdfsToLocal(FileSystem fs,String LocalPath,String HdfsPath) throws IOException {
        Path local = new Path(LocalPath);
        Path hdfs = new Path(HdfsPath);
        fs.copyToLocalFile(false,hdfs,local,true);
    }

    //创建HDFS目录
    public static void MkdirHdfsPath(FileSystem fs,String HdfsPath) throws IOException{
        fs.mkdirs(new Path(HdfsPath));
    }

    //HDFS系统内文件拷贝
    public static void CopyHdfsToHdfs(FileSystem fs,String SrcHdfsPath,String TargetHdfsPath) throws IOException{
        FSDataInputStream fsin = fs.open(new Path(SrcHdfsPath));
        FSDataOutputStream fsout = fs.create(new Path(TargetHdfsPath));
        IOUtils.copyBytes(fsin,fsout,1024,true);
    }

    //创建HDFS文件
    public static void CreateHdfsFile(FileSystem fs,String HdfsPath) throws IOException{
        fs.create(new Path(HdfsPath));
    }

    //查看HDFS文件元信息
    public static void SelHdfsMetadata(FileSystem fs,String HdfsPath) throws IOException{
        FileStatus[] stat = fs.listStatus(new Path(HdfsPath));
        for (FileStatus f:stat) {
            long accessTime = f.getAccessTime();    //创建时间
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            long blockSize = f.getBlockSize();      //块大小
            String group = f.getGroup();            //用户组
            long len = f.getLen();                  //文件长度
            long modificationTime = f.getModificationTime();            //修改时间
            String owner = f.getOwner();            //用户
            Path path = f.getPath();                //路径
            FsPermission permission = f.getPermission();    //权限
            short replication = f.getReplication();         //副本数
            String string = f.toString();                   //f集合所有内容
            boolean directory = f.isDirectory();            //判断是否目录
            boolean encrypted = f.isEncrypted();            //判断是否加密
            boolean file = f.isFile();                      //判断是否文件

            System.out.println("AccessTime:"+sdf.format(new Date(accessTime)));
            System.out.println("BlockSize:"+blockSize);
            System.out.println("Group:"+group);
            System.out.println("Length:"+len);
            System.out.println("ModificationTime:"+sdf.format(new Date(modificationTime)));
            System.out.println("Owner:"+owner);
            System.out.println("Path:"+path);
            System.out.println("Permission:"+permission);
            System.out.println("Replication:"+replication);
            System.out.println("StatusList:"+string);
            System.out.println("IsDirectory:"+directory);
            System.out.println("IsEncrypted:"+encrypted);
            System.out.println("IsFile:"+file);
            System.out.println("\n");
        }
    }

    //删除HDFS文件
    public static void DeleteHdfsFile(FileSystem fs,String HdfsPath) throws IOException{
        FileStatus[] stat = fs.listStatus(new Path(HdfsPath));
        for (FileStatus f:stat) {
            System.out.println("IsFile?"+f.isFile());
            System.out.println("IsDirectory?"+f.isDirectory());
            if (f.isFile() == true)
                //删除文件
                fs.delete(new Path(f.getPath().toString()),false);
            else if (f.isDirectory() == true)
                //删除目录及目录下所有文件
                fs.delete(new Path(f.getPath().toString()),true);
        }
    }

    //把HDFS文件内容逐行读取并输出到屏幕
    public static void ReadHdfsFile(FileSystem fs,String HdfsPath) throws IOException{
        FSDataInputStream fin = fs.open(new Path(HdfsPath));
        int i=0; //行数
        String s=null;
        BufferedReader br = new BufferedReader(new InputStreamReader(fin));
        while (((s = br.readLine()) != null)){
            i++;
            System.out.println("第"+i+"行:"+s);
        }
    }

    //把1-10数字逐行写入到HDFS文件
    public static void WriteHdfsFile(FileSystem fs,String HdfsPath) throws IOException{
        //若文件存在,则进行删除
        if (fs.exists(new Path(HdfsPath)))
            DeleteHdfsFile(fs,HdfsPath);
        FSDataOutputStream fout = fs.create(new Path(HdfsPath));
        for (int i=0;i<=10;i++){
            byte[] buffer = (i+"\n").getBytes();
            fout.write(buffer);
        }
    }
}