PIG Example – 1
Practice PIG:
To learn Pig Latin, let’s question the data. Let’s practice pig using a sample movie dataset. The file has a total of 49590 records. Before we start asking questions, we need the data to be accessible in Pig. Run pig in local mode and use the following commands to start grunt and load the data:
grunt> pig –x localgrunt> movies = LOAD '/usr/lib/pig/movies.csv' USING PigStorage(',') as (id,name,year,rating,duration);
The above statement is made up of two parts. The part to the left of “=” is called the relation or alias. It looks like a variable but you should note that this is not a variable. When this statement is executed, no MapReduce task is executed.
Since our dataset has records with fields separated by a comma we use the keyword USING PigStorage(‘,’).Another thing we have done in the above statement is giving the names to the fields using the ‘as’ keyword. Now, let’s test to see if the alias has the data we loaded.
grunt> DUMP movies;
Once, we execute the above statement, we should see lot of text on the screen as shown below
2013-12-25 23:03:04,550 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN2013-12-25 23:03:04,633 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}2013-12-25 23:03:04,748 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false2013-12-25 23:03:04,805 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 12013-12-25 23:03:04,805 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 12013-12-25 23:03:04,853 [main] INFO
org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job ................ HadoopVersion PigVersion UserId StartedAt FinishedAt Features1.1.2 0.12.0 hduser 2013-12-25 23:03:04 2013-12-25 23:03:05 UNKNOWN Success! Job Stats (time in seconds):JobId Alias Feature Outputsjob_local_0001 movies MAP_ONLY file:/tmp/temp-1685410826/tmp1113990343, Input(s):Successfully read records from:
"/home/hduser/pig/myscripts/movies_data.csv" Output(s):Successfully stored records in: "file:/tmp/temp-1685410826/tmp1113990343" Job DAG:job_local_0001 ................ (49586,Winter Wonderland,2013,2.8,1812)(49587,Top Gear: Series 19: Africa Special,2013,,6822)(49588,Fireplace For Your Home: Crackling Fireplace with Music,2010,,3610)(49589,Kate Plus Ei8ht,2010,2.7,)(49590,Kate Plus Ei8ht: Season 1,2010,2.7,)
It is only after the DUMP statement that a MapReduce job is initiated. As we see our data in the output we can confirm that the data has been loaded successfully. Now, since we have the data in Pig, let’s start with the questions.
List the movies that having a rating greater than 4
grunt> movies_greater_than_four = FILTER movies BY (float)rating>4.0;grunt> DUMP movies_greater_than_four;
The above statements filters the alias movies and store the results in a new aliasmovies_greater_than_four. The movies_greater_than_four alias will have only records of movies where the rating is greater than 4.
The DUMP command is only used to display information onto the standard output. If you need to store the data to a file you can use the following command:
grunt> store movies_greater_than_four into '/usr/lib/pig/movies_greater_than_four';
List the movies that were released between 1950 and 1960
grunt> movies_between_50_60 = FILTER movies by year>1950 and year<1960;
List the movies that start with the Alpahbet A
grunt> movies_starting_with_A = FILTER movies by name matches 'A.*';
List the movies that have duration greater that 2 hours
grunt> movies_duration_2_hrs = FILTER movies by duration > 7200;
List the movies that have rating between 3 and 4
grunt> movies_rating_3_4 = FILTER movies BY rating>3.0 and rating<4.0;
Try few more commands:
DESCRIBE – The schema of a relation/alias can be viewed using the DESCRIBE command
grunt> DESCRIBE movies;
movies: {id: int,name: chararray,year: int,rating: double,duration: int}
ILLUSTRATE – To view the step-by-step execution of a sequence of statements you can use the ILLUSTRATE command:
grunt> ILLUSTRATE movies_duration_2_hrs;
------------------------------------------------------------------------------------------------------------------------
| movies | id:int | name:chararray | year:int | rating:double | duration:int |
------------------------------------------------------------------------------------------------------------------------
| | 1567 | Barney: Sing & Dance with Barney | 2004 | 2.7 | 3244 |
| | 3045 | Strange Circus | 2005 | 2.8 | 6509 |
------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------
| movies_duration_2_hrs | id:int | name:chararray | year:int | rating:double | duration:int |
---------------------------------------------------------------------------------------------------------------------
| | 3045 | Strange Circus | 2005 | 2.8 | 6509 |
---------------------------------------------------------------------------------------------------------------------
DESCRIBE and ILLUSTRATE are really useful for debugging.
FOREACH – FOREACH gives a simple way to apply transformations based on columns. Let’s understand this with an example.
List the movie names its duration in minutes
grunt> movie_duration = FOREACH movies GENERATE name, (double)(duration/60);
The above statement generates a new alias that has the list of movies and it duration in minutes. You can check the results using the DUMP command.
GROUP – The GROUP keyword is used to group fields in a relation.
List the years and the number of movies released each year.
grunt> grouped_by_year = group movies by year;
grunt> count_by_year = FOREACH grouped_by_year GENERATE group, COUNT(movies);
You can check the result by dumping the count_by_year relation on the screen.
We know in advance that the total number of movies in the dataset is 49590. We can check to see if our GROUP operation is correct by verify the total of the COUNT field. If he sum of of the count field is 49590 we can be confident that our grouping has worked correctly.
grunt> group_all = GROUP count_by_year ALL;
grunt> sum_all = FOREACH group_all GENERATE SUM(count_by_year.$1);
grunt> DUMP sum_all;
From the above three statements, the first statement, GROUP ALL, groups all the tuples to one group. This is very useful when we need to perform aggregation operations on the entire set. The next statement, performs a FOREACH on the grouped relation group_all and applies the SUM function to the field in position 1 (positions start from 0). Here field in position 1, are the counts of movies for each year. One execution of the DUMP statement the MapReduce program kicks off and gives us the following result:
(49590)
The above value matches to our know fact that the dataset has 49590 movies. So we can conclude that our GROUP operation worked successfully.
ORDER BY – Let us question the data to illustrate the ORDER BY operation.
List all the movies in the ascending order of year.
grunt> desc_movies_by_year = ORDER movies BY year ASC;
grunt> DUMP desc_movies_by_year;
List all the movies in the descending order of year.
grunt> asc_movies_by_year = ORDER movies by year DESC;
grunt> DUMP asc_movies_by_year;
DISTINCT – The DISTINCT statement is used to remove duplicated records. It works only on entire records, not on individual fields.
Let’s illustrate this with an example:
grunt> movies_with_dups = LOAD '/home/hduser/pig/myscripts/movies_with_duplicates.csv' USING PigStorage(',') as (id:int,name:chararray,year:int,rating:double,duration:int);
grunt> DUMP movies_with_dups;
(1,The Nightmare Before Christmas,1993,3.9,4568)
(1,The Nightmare Before Christmas,1993,3.9,4568)
(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(5,Night Tide,1963,2.8,5126)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
(9,Nosferatu: Original Version,1929,3.5,5651)
You see that there are are duplicates in this data set. Now let us list the distinct records present movies_with_dups:
grunt> no_dups = DISTINCT movies_with_dups;
grunt> DUMP no_dups;
(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
LIMIT – Use the LIMIT keyword to get only a limited number for results from relation.
grunt> top_10_movies = LIMIT movies 10;
grunt> DUMP top_10_movies;
(1,The Nightmare Before Christmas,1993,3.9,4568)
(2,The Mummy,1932,3.5,4388)
(3,Orphans of the Storm,1921,3.2,9062)
(4,The Object of Beauty,1991,2.8,6150)
(5,Night Tide,1963,2.8,5126)
(6,One Magic Christmas,1985,3.8,5333)
(7,Muriel's Wedding,1994,3.5,6323)
(8,Mother's Boys,1994,3.4,5733)
(9,Nosferatu: Original Version,1929,3.5,5651)
(10,Nick of Time,1995,3.4,5333)
SAMPLE – Use the sample keyword to get sample set from your data.
grunt> sample_10_percent = sample movies 0.1;
grunt> dump sample_10_percent;
Here, 0.1 = 10%, As we already know that the file has 49590 records. We can check to see the count of records in the relation.
grunt> sample_group_all = GROUP sample_10_percent ALL;
grunt> sample_count = FOREACH sample_group_all GENERATE
COUNT(sample_10_percent.$0);grunt> dump sample_count;
The output is (4937) which is approximately 10% for 49590.