Hadoop - Java API
Create parent folders if not exists
Path p = new Path("...");
if (!fs.exists(p.getParent())) {
fs.mkdirs(p.getParent());
}
IO
InputStream stream = FileLocalizer.openDFSFile(configsPathHDFS);
Configuration conf = new Configuration();
conf.addResource(new Path(pathHadoopCoreSite));
conf.addResource(new Path(pathHadoopHDFSSite));
FileSystem fs = FileSystem.get(conf);
Path path = new Path(pathStr);
for (FileStatus f : fs.listStatus(path)) {
if (f.getPath().getName().startsWith("part-")) {
FSDataInputStream in = fs.open(f.getPath());
scanner = new Scanner(new BufferedInputStream(in));
// do something ...
}
}
Write to HDFS
JSON
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.FSDataOutputStream;
FSDataOutputStream ostream = fs.create(new Path(pathString));
objectMapper.writeValue(ostream, myObject);
Hadoop CompressionCodec
- DEFLATE:
org.apache.hadoop.io.compress.DefaultCodec
- gzip:
org.apache.hadoop.io.compress.GzipCodec
- bzip2:
org.apache.hadoop.io.compress.BZip2Codec
- LZO:
com.hadoop.compression.lzo.LzopCodec
- LZ4:
org.apache.hadoop.io.compress.Lz4Codec
- Snappy:
org.apache.hadoop.io.compress.SnappyCodec
GlobStatus
fs.globStatus(new Path("/2000/*/*"), new RegexExcludeFilter("^.*/2000/12/31$"))
Sort FileStatus
Collections.sort(listStatus, new Comparator<FileStatus>() {
@Override
public int compare(FileStatus f1, FileStatus f2) {
return f1.getPath().getName().compareToIgnoreCase(f2.getPath().getName());
}
});
Load HDFS Files from Java API
Basic Solution
Create a Scanner
to load the content of a file /path/to/file
on HDFS:
Scanner scanner = new Scanner(FileSystem.get(new Configuration()).open(new Path("/path/to/file")));
Step by Step:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
// Load configurations from xml files in $HADOOP_HOME/conf/
Configuration conf = new Configuration();
// Create FileSystem from configuration
FileSystem fs = FileSystem.get(conf);
// Create Hadoop Path
Path path = new Path("/path/to/file");
// Open path as an input stream
FSDataInputStream is = fs.open(path);
// Create scanner
Scanner scanner = new Scanner(is);
Support Local Mode
To support both HDFS and local, use FileSystem.getLocal()
instead of FileSystem.get()
;
Boolean localMode;
FileSystem fs;
if (localMode == true) {
fs = FileSystem.getLocal(conf);
} else {
fs = FileSystem.get(conf);
}
Support Loading a Folder
To read all the files in a folder, and filter based on path
FileStatus[] fileStatusList;
Path p = new Path("/path/to");
if (fs.getFileStatus(p).isDir()) {
// filter out path starts with . or _
fileStatusList = fs.listStatus(p, new PathFilter() {
@Override
public boolean accept(Path path) {
if (path.getName().startsWith(".") || path.getName().startsWith("_")) {
return false;
}
return true;
}
});
} else {
fileStatusList = new FileStatus[]{fs.getFileStatus(p)};
}
List<Scanner> scanners = new ArrayList<Scanner>();
for (FileStatus f : fileStatusList) {
scanners.add(new Scanner(new BufferedInputStream(fs.open(f.getPath()))));
}
}
Support Compressed Files
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import java.util.zip.GZIPInputStream;
List<Scanner> scanners = new ArrayList<Scanner>();
for (FileStatus f : fileStatusList) {
String filename = f.getPath().getName();
log.debug("Creating Scanner for file: {} ", filename);
if (filename.endsWith(".gz")) {
scanners.add(new Scanner(new GZIPInputStream(fs.open(f.getPath()))));
} else if (filename.endsWith(".bz2")) {
scanners.add(new Scanner(new BZip2CompressorInputStream(fs.open(f.getPath()))));
} else {
scanners.add(new Scanner(new BufferedInputStream(fs.open(f.getPath()))));
}
}
Put Everything Together
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import java.util.zip.GZIPInputStream;
private List<Scanner> getDataScanners(String path, Boolean localMode) throws IOException {
Configuration conf = new Configuration();
FileSystem fs;
if (localMode) {
fs = FileSystem.getLocal(conf);
} else {
fs = FileSystem.get(conf);
}
FileStatus[] fileStatusList;
Path p = new Path(path);
if (fs.getFileStatus(p).isDir()) {
// for folder we need filter pig header files
fileStatusList = fs.listStatus(p, new PathFilter() {
@Override
public boolean accept(Path path) {
if (path.getName().startsWith(".") || path.getName().startsWith("_")) {
return false;
}
return true;
}
});
} else {
fileStatusList = new FileStatus[]{fs.getFileStatus(p)};
}
List<Scanner> scanners = new ArrayList<Scanner>();
for (FileStatus f : fileStatusList) {
String filename = f.getPath().getName();
log.debug("Creating Scanner for file: {} ", filename);
if (filename.endsWith(".gz")) {
scanners.add(new Scanner(new GZIPInputStream(fs.open(f.getPath()))));
} else if (filename.endsWith(".bz2")) {
scanners.add(new Scanner(new BZip2CompressorInputStream(fs.open(f.getPath()))));
} else {
scanners.add(new Scanner(new BufferedInputStream(fs.open(f.getPath()))));
}
}
return scanners;
}
Copy From Local To HDFS
FSDataOutputStream os = fs.create(new Path(pathHDFS));
objectMapper.writeValue(os, obj);
or
fs.copyFromLocalFile(new Path(pathLocal), new Path(pathHDFS));
Read HDFS Files
Configuration conf = new Configuration();
conf.addResource(new Path(this.pathHadoopCoreSite));
conf.addResource(new Path(this.pathHadoopHDFSSite));
FileSystem fs = FileSystem.get(conf);
InputStream stream = fs.open(new Path(path));
reader = new BufferedReader(new InputStreamReader(stream));
Driver
Create Configuration(conf)
Configuration conf = new Configuration();
Create Job(conf->job)
Job job = new Job(conf)
Create FileSystem(conf->fs)
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);
Create Path(path)
Path inputDir = new Path(args[0]);
Path hdfsFile = new Path(args[1]);
Create FileStatus(fs, path -> status)
FileStatus[] inputFiles = local.listStatus(inputDir);
Create FSDataInputStream / FSDataOutputStream(fs, status -> in/out)
FSDataInputStream in = local.open(inputFiles[i].getPath());
FSDataOutputStream out = hdfs.create(hdfsFile);
Create a new Job
Job job = new Job(conf, "Create txn linking detail outputs");
job.setJobName("AccountInfo.jar");
job.setJarByClass(AccountInfo.class);
job.setMapperClass(AccountInfoMapper.class);
job.setReducerClass(AccountInfoReducer.class);
job.setPartitionerClass(KeyPartitioner.class);
job.setNumReduceTasks(conf.getInt("num_reducer", 1));
job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
job.setMapOutputKeyClass(LongTextPair.class);
job.setMapOutputValueClass(LongTextPair.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);
- FSDataInputStream: a specialization of java.io.DataInputStream with support for random access, can read from any part of the stream.
- Path: encode file and directory names
- FileStatus: store metadata for files and directories.
create new
public FSDataOutputStream create(Path f) throws IOException
append to existing file
public FSDataOutputStream append(Path f) throws IOException
make directory
public boolean mkdirs(Path f) throws IOException
delete
public boolean delete(Path f, boolean recursive) throws IOException
Get FileSystem instance for interaction:
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);
Path inputDir = new Path(args[0]);
FileStatus[] inputFiles = local.listStatus(inputDir);
Use FSDataInputStream to read in the file.
FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while( (bytesRead = in.read(buffer)) > 0) {
...
}
in.close();
Path hdfsFile = new Path(args[1]);
FSDataOutputStream out = hdfs.create(hdfsFile);
out.write(buffer, 0, bytesRead);
out.close();