Hadoop - Pig

Updated: 2019-01-03

Create New Tuple and Bag

import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;

Create Factory

TupleFactory tupleFactory   = TupleFactory.getInstance();
BagFactory bagFactory       = BagFactory.getInstance();

Use Factory to create Tuple and Bag

Tuple tuple     = tupleFactory.newTuple();
DataBag dataBag = bagFactory.newDefaultBag();

Notes: Tuple is an interface, we cannot create Tuple using Tuple tuple = new Tuple(); Instead we create a TupleFactory first.

public abstract class TupleFactory {
    private static TupleFactory gSelf = null;

    public static TupleFactory getInstance();

    ...
}

Quote from API documentation:

This class is abstract so that users can override the tuple factory if they desire to provide their own that returns their implementation of a tuple. If the property pig.data.tuple.factory.name is set to a class name and pig.data.tuple.factory.jar is set to a URL pointing to a jar that contains the above named class, then getInstance() will create a an instance of the named class using the indicated jar. Otherwise, it will create an instance of DefaultTupleFactory." getInstance() is used to "get a reference to the singleton factory." Where singleton means there will be only one gSelf. If it is not null, the function will return gSelf, instead of generating a new factory.

Output Schema the Easier Way

Instead of building schema using Schema and FieldSchema from ground up, we could use Utils.getSchemaFromString() to generate schema directly from a string.

import org.apache.pig.impl.util.Utils;
public Schema outputSchema(Schema input) {
    try {
        String schemaString = "ScoreBoard:Tuple(Name:chararray, Score:int)";
        return Utils.getSchemaFromString(schemaString);
    } catch (Exception e) {
        return null;
    }
}

Call "DESCRIBE" in pig and we see the result as:

A: {ScoreBoard: (Name: chararray,Score: int)}

PigServer

Create PigServer

Properties props = new Properties();
props.setProperty("fs.default.name", "hdfs://<namenode-hostname>:<port>");
props.setProperty("mapred.job.tracker", "<jobtracker-hostname>:<port>");
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, props);

Get Result

// pigServer.store() returns a ExecJob
ExecJob result = pigServer.store("...", path);
Iterator<Tuple> resultIterator = result.getResults();

// or
Iterator<Tuple> resultIterator = pigServer.store("...", path).getResults();

Use Snappy

set output.compression.enabled true;
set output.compression.codec org.apache.hadoop.io.compress.SnappyCodec;

Pig Types

public class DataType {
    public static final byte UNKNOWN   =   0;
    public static final byte NULL      =   1;
    public static final byte BOOLEAN   =   5; // internal use only
    public static final byte BYTE      =   6; // internal use only
    public static final byte INTEGER   =  10;
    public static final byte LONG      =  15;
    public static final byte FLOAT     =  20;
    public static final byte DOUBLE    =  25;
    public static final byte BYTEARRAY =  50;
    public static final byte CHARARRAY =  55;
    public static final byte MAP       = 100;
    public static final byte TUPLE     = 110;
    public static final byte BAG       = 120;
    public static final byte ERROR     =  -1;     // more code here
}

Set Memory

pig -Dmapred.child.java.opts=-Xmx2G foo.pig

Count Lines

grp = GROUP raw ALL;
cnt = FOREACH grp GENERATE COUNT($1);
dump cnt;

Open File from HDFS

If a UDF needs to open a file(a config/property file), the path is actually a local path to each node, there is no (good) way to distribute and read that file; instead we could read file from HDFS using FileLocalizer:

import org.apache.pig.impl.io.FileLocalizer;

InputStream stream = FileLocalizer.openDFSFile(path);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));

Load Multiple Data

weeknum={101,102,103}

Trouble Shooting

  • Parameters set by -param and -param_file are not visible by imported macros, need to pass them as parameter when calling the macro
  • After calling DISTINCT, may fail to be filtered by SIZE
  • if the number of the result of inner join is larger than the original set, there are duplicates
  • pig count: if first column is null, this line will be ignored

Eclipse Run Configuration

Main tab:

  • Project: (project name)
  • Main class: org.apache.pig.Main

Arguments tab:

  • Program arguments: -x local /{PATH}/{TO}/script.pig
  • VM arguments: -Xmx1G -Xms1G

Vim Pig Plugins

http://www.vim.org/scripts/script.php?script_id=2186

Split Data

data = FOREACH data GENERATE target, features, RANDOM() as random;
SPLIT data INTO training IF random <= 0.9,
                test IF random > 0.9;

Code Examples

A = LOAD 'src/main/pig/testdata' AS (s1:tuple(s1a:chararray), s2:bag{s2a:tuple(s2a1:chararray)});
((A),{(B),(C),(D)})
((E),{(F),(G)})

B = FOREACH A GENERATE b;
dump B
({(B),(C),(D)})
({(F),(G)})


describe A;
describe B;

A: {t: (t1: chararray),b: {b1: (b1a: chararray)}}
B: {b: {b1: (b1a: chararray)}}

C = FOREACH B GENERATE FLATTEN(b);
(B)
(C)
(D)
(F)
(G)

A = LOAD'src/main/pig/testdata'AS (s1:chararray, s2:chararray, s3:chararray);

using tab

x     y     z

result

(x,y,z)

using space

x y z

result

(x y z,,)
A = LOAD 'data' AS (t1:tuple(t1a:int, t1b:int,t1c:int),t2:tuple(t2a:int,t2b:int,t2c:int));
(3,8,9) (4,5,6)
(1,4,7) (3,7,5)
(2,5,8) (9,5,8)

use tab

((3,8,9),(4,5,6))
((1,4,7),(3,7,5))
((2,5,8),(9,5,8))

use space

((3,8,9),)
((1,4,7),)
((2,5,8),)

Error: ACCESSINGNONEXISTENT_FIELD

WARN [main] org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger: org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject(ACCESSING_NON_EXISTENT_FIELD): Attempt to access field which was not found in the input

Solution:

  • If you FLATTEN(null), it will return null
  • if you FLATTEN a empty bag, it will throw away the row.

UDF should return an empty bag so it will be removed after FLATTEN.

EvalFunc

  • Algebraic interface:function is algebraic,combiner can be used

  • Accumulator interface:process tuples in an incremental fashion

  • EvalFunc:input:Tuple, Output:anything

  • Aggregate: input:Bag, output: scalar

Filter functions are eval functions that return a boolean value

Tips

Store Schema

... USING PigStorage('|','-schema');

Compress Data

use .gz or .bz to save space::

STORE UserCount INTO '/tmp/usercount.gz' USING PigStorage(',');

result would be:

/tmp/usercount.gz/part-r-00000.gz

Check NULL

use IS NULL instead of ==NULL

Split String

raw_data = LOAD '...' USING PigStorage() AS (raw_string: chararray);
result = FOREACH raw_data GENERATE FLATTEN(STRSPLIT(raw_string, '\\|', 2));

pig sample by percentage

A = LOAD 'data' AS (f1:int,f2:int,f3:int);

X = SAMPLE A 0.01;

to sample a fixed number of rows: a = load 'a.txt'; b = group a all; c = foreach b generate COUNT(a) as numrows; e = sample a 1000/c.numrows;

Pig vs SQL

SELECT * FROM mytable;
DUMP mytable;

SELECT col1, col2 FROM mytable;
mytable = FOREACH mytable GENERATE col1, col2;
DUMP mytable;

SELECT col1 AS new_col1, col2 AS new_col2 FROM mytable;
mytable = FOREACH mytable GENERATE col1 AS new_col1, col2 AS new_col2;
DUMP mytable;

SELECT col1::integer, col2::varchar FROM mytable;
mytable = FOREACH mytable GENERATE (int)col1, (chararray)col2;
DUMP mytable;

SELECT * FROM mytable LIMIT 10;
mytable = LIMIT mytable 10;
DUMP mytable;

SELECT * FROM mytable ORDER BY col1 ASC;
mytable = ORDER mytable BY col1 ASC;
DUMP mytable;

SELECT * FROM mytable WHERE col1 > 20;
mytable = FILTER mytable BY col1 > 20;
DUMP mytable;

JOIN

SELECT * FROM mytable INNER JOIN othertable ON mytable.col1 = othertable.col1;  mytable = JOIN mytable BY col1, othertable BY col1; DUMP mytable;

SELECT * FROM mytable LEFT OUTER JOIN othertable ON mytable.col1 = othertable.col1;
mytable = JOIN mytable BY col1 LEFT OUTER, othertable BY col1;
DUMP mytable;

SELECT * FROM mytable RIGHT OUTER JOIN othertable ON mytable.col1 = othertable.col1;
mytable = JOIN mytable BY col1 RIGHT OUTER, othertable BY col1;
DUMP mytable;

SELECT * FROM mytable FULL OUTER JOIN othertable ON mytable.col1 = othertable.col1;
mytable = JOIN mytable BY col1 FULL OUTER, othertable BY col1;
DUMP mytable;

SELECT * FROM mytable, othertable;
mytable = CROSS mytable, othertable;
DUMP mytable;

GROUP BY

SELECT COUNT(*) FROM mytable;
mytable = GROUP mytable ALL;
mytable = FOREACH mytable GENERATE COUNT(mytable);
DUMP mytable;

SELECT COUNT(DISTINCT col1) FROM mytable;
mytable = FOREACH mytable GENERATE col1;
mytable = DISTINCT col1;
mytable = GROUP mytable BY col1;
mytable = FOREACH mytable GENERATE group AS col1, COUNT(mytable) AS cnt;
DUMP mytable;

TABLES

CREATE TABLE newtable AS SELECT * FROM mytable;
STORE mytable INTO '/some_hdfs_folder/newtable' USING PigStorage(',');

DROP TABLE newtable;
RMF /some_hdfs_folder/newtable;

Remove Folders

Writing to an existing path will cause an Error. To remove the path before the writing, use rmf.

rmf '/path/to/output';
STORE data INFO '/path/to/output' USING PigStorage();

Pig Highlight Vim

pig highlight vim: https://github.com/motus/pig.vim