Hadoop - Pig
Create New Tuple and Bag
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.BagFactory;
Create Factory
TupleFactory tupleFactory = TupleFactory.getInstance();
BagFactory bagFactory = BagFactory.getInstance();
Use Factory to create Tuple and Bag
Tuple tuple = tupleFactory.newTuple();
DataBag dataBag = bagFactory.newDefaultBag();
Notes: Tuple is an interface, we cannot create Tuple using Tuple tuple = new Tuple();
Instead we create a TupleFactory first.
public abstract class TupleFactory {
private static TupleFactory gSelf = null;
public static TupleFactory getInstance();
...
}
Quote from API documentation:
This class is abstract so that users can override the tuple factory if they desire to provide their own that returns their implementation of a tuple. If the property pig.data.tuple.factory.name is set to a class name and pig.data.tuple.factory.jar is set to a URL pointing to a jar that contains the above named class, then
getInstance()
will create a an instance of the named class using the indicated jar. Otherwise, it will create an instance of DefaultTupleFactory."getInstance()
is used to "get a reference to the singleton factory." Where singleton means there will be only one gSelf. If it is not null, the function will return gSelf, instead of generating a new factory.
Output Schema the Easier Way
Instead of building schema using Schema and FieldSchema from ground up, we could use Utils.getSchemaFromString()
to generate schema directly from a string.
import org.apache.pig.impl.util.Utils;
public Schema outputSchema(Schema input) {
try {
String schemaString = "ScoreBoard:Tuple(Name:chararray, Score:int)";
return Utils.getSchemaFromString(schemaString);
} catch (Exception e) {
return null;
}
}
Call "DESCRIBE" in pig and we see the result as:
A: {ScoreBoard: (Name: chararray,Score: int)}
PigServer
Create PigServer
Properties props = new Properties();
props.setProperty("fs.default.name", "hdfs://<namenode-hostname>:<port>");
props.setProperty("mapred.job.tracker", "<jobtracker-hostname>:<port>");
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, props);
Get Result
// pigServer.store() returns a ExecJob
ExecJob result = pigServer.store("...", path);
Iterator<Tuple> resultIterator = result.getResults();
// or
Iterator<Tuple> resultIterator = pigServer.store("...", path).getResults();
Use Snappy
set output.compression.enabled true;
set output.compression.codec org.apache.hadoop.io.compress.SnappyCodec;
Pig Types
public class DataType {
public static final byte UNKNOWN = 0;
public static final byte NULL = 1;
public static final byte BOOLEAN = 5; // internal use only
public static final byte BYTE = 6; // internal use only
public static final byte INTEGER = 10;
public static final byte LONG = 15;
public static final byte FLOAT = 20;
public static final byte DOUBLE = 25;
public static final byte BYTEARRAY = 50;
public static final byte CHARARRAY = 55;
public static final byte MAP = 100;
public static final byte TUPLE = 110;
public static final byte BAG = 120;
public static final byte ERROR = -1; // more code here
}
Set Memory
pig -Dmapred.child.java.opts=-Xmx2G foo.pig
Count Lines
grp = GROUP raw ALL;
cnt = FOREACH grp GENERATE COUNT($1);
dump cnt;
Open File from HDFS
If a UDF needs to open a file(a config/property file), the path is actually a local path to each node, there is no (good) way to distribute and read that file; instead we could read file from HDFS using FileLocalizer:
import org.apache.pig.impl.io.FileLocalizer;
InputStream stream = FileLocalizer.openDFSFile(path);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
Load Multiple Data
weeknum={101,102,103}
Trouble Shooting
- Parameters set by
-param
and-param_file
are not visible by imported macros, need to pass them as parameter when calling the macro - After calling
DISTINCT
, may fail to be filtered bySIZE
- if the number of the result of inner join is larger than the original set, there are duplicates
- pig count: if first column is null, this line will be ignored
Eclipse Run Configuration
Main tab:
- Project: (project name)
- Main class:
org.apache.pig.Main
Arguments tab:
- Program arguments:
-x local /{PATH}/{TO}/script.pig
- VM arguments:
-Xmx1G -Xms1G
Vim Pig Plugins
http://www.vim.org/scripts/script.php?script_id=2186
Split Data
data = FOREACH data GENERATE target, features, RANDOM() as random;
SPLIT data INTO training IF random <= 0.9,
test IF random > 0.9;
Code Examples
A = LOAD 'src/main/pig/testdata' AS (s1:tuple(s1a:chararray), s2:bag{s2a:tuple(s2a1:chararray)});
((A),{(B),(C),(D)})
((E),{(F),(G)})
B = FOREACH A GENERATE b;
dump B
({(B),(C),(D)})
({(F),(G)})
describe A;
describe B;
A: {t: (t1: chararray),b: {b1: (b1a: chararray)}}
B: {b: {b1: (b1a: chararray)}}
C = FOREACH B GENERATE FLATTEN(b);
(B)
(C)
(D)
(F)
(G)
A = LOAD'src/main/pig/testdata'AS (s1:chararray, s2:chararray, s3:chararray);
using tab
x y z
result
(x,y,z)
using space
x y z
result
(x y z,,)
A = LOAD 'data' AS (t1:tuple(t1a:int, t1b:int,t1c:int),t2:tuple(t2a:int,t2b:int,t2c:int));
(3,8,9) (4,5,6)
(1,4,7) (3,7,5)
(2,5,8) (9,5,8)
use tab
((3,8,9),(4,5,6))
((1,4,7),(3,7,5))
((2,5,8),(9,5,8))
use space
((3,8,9),)
((1,4,7),)
((2,5,8),)
Error: ACCESSINGNONEXISTENT_FIELD
WARN [main] org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger: org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject(ACCESSING_NON_EXISTENT_FIELD): Attempt to access field which was not found in the input
Solution:
- If you FLATTEN(null), it will return null
- if you FLATTEN a empty bag, it will throw away the row.
UDF should return an empty bag so it will be removed after FLATTEN.
EvalFunc
-
Algebraic interface:function is algebraic,combiner can be used
-
Accumulator interface:process tuples in an incremental fashion
-
EvalFunc:input:Tuple, Output:anything
-
Aggregate: input:Bag, output: scalar
Filter functions are eval functions that return a boolean value
Tips
Store Schema
... USING PigStorage('|','-schema');
Compress Data
use .gz or .bz to save space::
STORE UserCount INTO '/tmp/usercount.gz' USING PigStorage(',');
result would be:
/tmp/usercount.gz/part-r-00000.gz
Check NULL
use IS NULL instead of ==NULL
Split String
raw_data = LOAD '...' USING PigStorage() AS (raw_string: chararray);
result = FOREACH raw_data GENERATE FLATTEN(STRSPLIT(raw_string, '\\|', 2));
pig sample by percentage
A = LOAD 'data' AS (f1:int,f2:int,f3:int);
X = SAMPLE A 0.01;
to sample a fixed number of rows: a = load 'a.txt'; b = group a all; c = foreach b generate COUNT(a) as numrows; e = sample a 1000/c.numrows;
Pig vs SQL
SELECT * FROM mytable;
DUMP mytable;
SELECT col1, col2 FROM mytable;
mytable = FOREACH mytable GENERATE col1, col2;
DUMP mytable;
SELECT col1 AS new_col1, col2 AS new_col2 FROM mytable;
mytable = FOREACH mytable GENERATE col1 AS new_col1, col2 AS new_col2;
DUMP mytable;
SELECT col1::integer, col2::varchar FROM mytable;
mytable = FOREACH mytable GENERATE (int)col1, (chararray)col2;
DUMP mytable;
SELECT * FROM mytable LIMIT 10;
mytable = LIMIT mytable 10;
DUMP mytable;
SELECT * FROM mytable ORDER BY col1 ASC;
mytable = ORDER mytable BY col1 ASC;
DUMP mytable;
SELECT * FROM mytable WHERE col1 > 20;
mytable = FILTER mytable BY col1 > 20;
DUMP mytable;
JOIN
SELECT * FROM mytable INNER JOIN othertable ON mytable.col1 = othertable.col1; mytable = JOIN mytable BY col1, othertable BY col1; DUMP mytable;
SELECT * FROM mytable LEFT OUTER JOIN othertable ON mytable.col1 = othertable.col1;
mytable = JOIN mytable BY col1 LEFT OUTER, othertable BY col1;
DUMP mytable;
SELECT * FROM mytable RIGHT OUTER JOIN othertable ON mytable.col1 = othertable.col1;
mytable = JOIN mytable BY col1 RIGHT OUTER, othertable BY col1;
DUMP mytable;
SELECT * FROM mytable FULL OUTER JOIN othertable ON mytable.col1 = othertable.col1;
mytable = JOIN mytable BY col1 FULL OUTER, othertable BY col1;
DUMP mytable;
SELECT * FROM mytable, othertable;
mytable = CROSS mytable, othertable;
DUMP mytable;
GROUP BY
SELECT COUNT(*) FROM mytable;
mytable = GROUP mytable ALL;
mytable = FOREACH mytable GENERATE COUNT(mytable);
DUMP mytable;
SELECT COUNT(DISTINCT col1) FROM mytable;
mytable = FOREACH mytable GENERATE col1;
mytable = DISTINCT col1;
mytable = GROUP mytable BY col1;
mytable = FOREACH mytable GENERATE group AS col1, COUNT(mytable) AS cnt;
DUMP mytable;
TABLES
CREATE TABLE newtable AS SELECT * FROM mytable;
STORE mytable INTO '/some_hdfs_folder/newtable' USING PigStorage(',');
DROP TABLE newtable;
RMF /some_hdfs_folder/newtable;
Remove Folders
Writing to an existing path will cause an Error. To remove the path before the writing, use rmf
.
rmf '/path/to/output';
STORE data INFO '/path/to/output' USING PigStorage();
Pig Highlight Vim
pig highlight vim: https://github.com/motus/pig.vim