logo

Hadoop - Java API

Last Updated: 2022-02-06

Create parent folders if not exists

Path p = new Path("...");

if (!fs.exists(p.getParent())) {
    fs.mkdirs(p.getParent());
}

IO

InputStream stream = FileLocalizer.openDFSFile(configsPathHDFS);
Configuration conf = new Configuration();
conf.addResource(new Path(pathHadoopCoreSite));
conf.addResource(new Path(pathHadoopHDFSSite));
FileSystem fs = FileSystem.get(conf);

Path path = new Path(pathStr);
for (FileStatus f : fs.listStatus(path)) {
    if (f.getPath().getName().startsWith("part-")) {
        FSDataInputStream in = fs.open(f.getPath());
        scanner = new Scanner(new BufferedInputStream(in));
        // do something ...
    }
}

Write to HDFS

JSON

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.FSDataOutputStream;

FSDataOutputStream ostream = fs.create(new Path(pathString));
objectMapper.writeValue(ostream, myObject);

Hadoop CompressionCodec

  • DEFLATE: org.apache.hadoop.io.compress.DefaultCodec
  • gzip: org.apache.hadoop.io.compress.GzipCodec
  • bzip2: org.apache.hadoop.io.compress.BZip2Codec
  • LZO: com.hadoop.compression.lzo.LzopCodec
  • LZ4: org.apache.hadoop.io.compress.Lz4Codec
  • Snappy: org.apache.hadoop.io.compress.SnappyCodec

GlobStatus

fs.globStatus(new Path("/2000/*/*"), new RegexExcludeFilter("^.*/2000/12/31$"))

Sort FileStatus

Collections.sort(listStatus, new Comparator<FileStatus>() {
    @Override
    public int compare(FileStatus f1, FileStatus f2) {
        return f1.getPath().getName().compareToIgnoreCase(f2.getPath().getName());
    }

});

Load HDFS Files from Java API

Basic Solution

Create a Scanner to load the content of a file /path/to/file on HDFS:

Scanner scanner = new Scanner(FileSystem.get(new Configuration()).open(new Path("/path/to/file")));

Step by Step:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

// Load configurations from xml files in $HADOOP_HOME/conf/
Configuration conf = new Configuration();

// Create FileSystem from configuration
FileSystem fs = FileSystem.get(conf);

// Create Hadoop Path
Path path = new Path("/path/to/file");

// Open path as an input stream
FSDataInputStream is = fs.open(path);

// Create scanner
Scanner scanner = new Scanner(is);

Support Local Mode

To support both HDFS and local, use FileSystem.getLocal() instead of FileSystem.get();

Boolean localMode;
FileSystem fs;

if (localMode == true) {
    fs = FileSystem.getLocal(conf);
} else {
    fs = FileSystem.get(conf);
}

Support Loading a Folder

To read all the files in a folder, and filter based on path

FileStatus[] fileStatusList;
Path p = new Path("/path/to");
if (fs.getFileStatus(p).isDir()) {
    // filter out path starts with . or _
    fileStatusList = fs.listStatus(p, new PathFilter() {
        @Override
        public boolean accept(Path path) {
            if (path.getName().startsWith(".") || path.getName().startsWith("_")) {
                return false;
            }
            return true;
        }
    });
} else {
    fileStatusList = new FileStatus[]{fs.getFileStatus(p)};
}

List<Scanner> scanners = new ArrayList<Scanner>();
    for (FileStatus f : fileStatusList) {
        scanners.add(new Scanner(new BufferedInputStream(fs.open(f.getPath()))));
    }
}

Support Compressed Files

import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import java.util.zip.GZIPInputStream;

List<Scanner> scanners = new ArrayList<Scanner>();
for (FileStatus f : fileStatusList) {
    String filename = f.getPath().getName();

    log.debug("Creating Scanner for file: {} ", filename);
    if (filename.endsWith(".gz")) {
        scanners.add(new Scanner(new GZIPInputStream(fs.open(f.getPath()))));
    } else if (filename.endsWith(".bz2")) {
        scanners.add(new Scanner(new BZip2CompressorInputStream(fs.open(f.getPath()))));
    } else {
        scanners.add(new Scanner(new BufferedInputStream(fs.open(f.getPath()))));
    }
}

Put Everything Together

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import java.util.zip.GZIPInputStream;

private List<Scanner> getDataScanners(String path, Boolean localMode) throws IOException {
    Configuration conf = new Configuration();

    FileSystem fs;


    if (localMode) {
        fs = FileSystem.getLocal(conf);
    } else {
        fs = FileSystem.get(conf);
    }


    FileStatus[] fileStatusList;
    Path p = new Path(path);
    if (fs.getFileStatus(p).isDir()) {
        // for folder we need filter pig header files
        fileStatusList = fs.listStatus(p, new PathFilter() {
            @Override
            public boolean accept(Path path) {
                if (path.getName().startsWith(".") || path.getName().startsWith("_")) {
                    return false;
                }
                return true;
            }
        });
    } else {
        fileStatusList = new FileStatus[]{fs.getFileStatus(p)};
    }

    List<Scanner> scanners = new ArrayList<Scanner>();
    for (FileStatus f : fileStatusList) {
        String filename = f.getPath().getName();

        log.debug("Creating Scanner for file: {} ", filename);
        if (filename.endsWith(".gz")) {
            scanners.add(new Scanner(new GZIPInputStream(fs.open(f.getPath()))));
        } else if (filename.endsWith(".bz2")) {
            scanners.add(new Scanner(new BZip2CompressorInputStream(fs.open(f.getPath()))));
        } else {
            scanners.add(new Scanner(new BufferedInputStream(fs.open(f.getPath()))));
        }
    }

    return scanners;
}

Copy From Local To HDFS

FSDataOutputStream os = fs.create(new Path(pathHDFS));
objectMapper.writeValue(os, obj);

or

fs.copyFromLocalFile(new Path(pathLocal), new Path(pathHDFS));

Read HDFS Files

Configuration conf = new Configuration();
conf.addResource(new Path(this.pathHadoopCoreSite));
conf.addResource(new Path(this.pathHadoopHDFSSite));
FileSystem fs = FileSystem.get(conf);

InputStream stream = fs.open(new Path(path));
reader = new BufferedReader(new InputStreamReader(stream));

Driver

Create Configuration(conf)

	Configuration conf = new Configuration();

Create Job(conf->job)

Job job = new Job(conf)

Create FileSystem(conf->fs)

FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);

Create Path(path)

Path inputDir = new Path(args[0]);
Path hdfsFile = new Path(args[1]);

Create FileStatus(fs, path -> status)

FileStatus[] inputFiles = local.listStatus(inputDir);

Create FSDataInputStream / FSDataOutputStream(fs, status -> in/out)

FSDataInputStream  in 	= local.open(inputFiles[i].getPath());
FSDataOutputStream out 	= hdfs.create(hdfsFile);

Create a new Job

Job job = new Job(conf, "Create txn linking detail outputs");
job.setJobName("AccountInfo.jar");
job.setJarByClass(AccountInfo.class);


job.setMapperClass(AccountInfoMapper.class);
job.setReducerClass(AccountInfoReducer.class);
job.setPartitionerClass(KeyPartitioner.class);

job.setNumReduceTasks(conf.getInt("num_reducer", 1));

job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);

job.setMapOutputKeyClass(LongTextPair.class);
job.setMapOutputValueClass(LongTextPair.class);

job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);



// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);
  • FSDataInputStream: a specialization of java.io.DataInputStream with support for random access, can read from any part of the stream.
  • Path: encode file and directory names
  • FileStatus: store metadata for files and directories.

create new

public FSDataOutputStream create(Path f) throws IOException

append to existing file

public FSDataOutputStream append(Path f) throws IOException

make directory

public boolean mkdirs(Path f) throws IOException

delete

public boolean delete(Path f, boolean recursive) throws IOException

Get FileSystem instance for interaction:

Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);
Path inputDir = new Path(args[0]);
FileStatus[] inputFiles = local.listStatus(inputDir);

Use FSDataInputStream to read in the file.

FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while( (bytesRead = in.read(buffer)) > 0) {
    ...
}
in.close();

Path hdfsFile = new Path(args[1]);
FSDataOutputStream out = hdfs.create(hdfsFile);
out.write(buffer, 0, bytesRead);
out.close();