Hadoop and Hive with GAUSS
Welcome to the Hadoop and Hive tutorial using GAUSS 16+. This example will involve acquiring movie ratings data and determining the average movie rating each user gave. The tutorial assumes that Hadoop and Hive are both installed and configured. All tests were performed under Ubuntu 14.04.1 with Hadoop 2.7.2 and Hive 1.2.1. All
hduser references are for the local account used for testing.
Data Preparation
These instructions assume your Hadoop cluster is running on Linux. All data preparation commands are to be performed on the machine Hadoop is installed (HDFS location).
- Fetch the sample data:
$ wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
| $ unzip ml-100k.zip |
| Archive: ml-100k.zip |
| Length Date Time Name |
| --------- ---------- ----- ---- |
| 0 2015-03-31 14:33 ml-100k/ |
| 716 2000-07-19 14:09 ml-100k/allbut.pl |
| 643 2000-07-19 14:09 ml-100k/mku.sh |
| 6403 2015-03-31 14:33 ml-100k/README |
| 1979173 2000-07-19 14:09 ml-100k/u.data |
| 202 2000-07-19 14:09 ml-100k/u.genre |
| 36 2000-07-19 14:09 ml-100k/u.info |
| 236344 2000-07-19 14:09 ml-100k/u.item |
| 193 2000-07-19 14:09 ml-100k/u.occupation |
| 22628 2000-07-19 14:09 ml-100k/u.user |
| 1586544 2001-03-08 11:33 ml-100k/u1.base |
| 392629 2001-03-08 11:32 ml-100k/u1.test |
| 1583948 2001-03-08 11:33 ml-100k/u2.base |
| 395225 2001-03-08 11:33 ml-100k/u2.test |
| 1582546 2001-03-08 11:33 ml-100k/u3.base |
| 396627 2001-03-08 11:33 ml-100k/u3.test |
| 1581878 2001-03-08 11:33 ml-100k/u4.base |
| 397295 2001-03-08 11:33 ml-100k/u4.test |
| 1581776 2001-03-08 11:34 ml-100k/u5.base |
| 397397 2001-03-08 11:33 ml-100k/u5.test |
| 1792501 2001-03-08 11:34 ml-100k/ua.base |
| 186672 2001-03-08 11:34 ml-100k/ua.test |
| 1792476 2001-03-08 11:34 ml-100k/ub.base |
| 186697 2001-03-08 11:34 ml-100k/ub.test |
| --------- ------- |
| 16100549 24 files |
- Place the example data file, named u.data, directly into the HDFS (used for non-HiveQL queries):
$ hadoop fs -put ml-100k/u.data
- Preview the first 20 lines of the data file we just placed in the HDFS:
| $ hadoop fs -cat u.data | head -n20 |
| userid movieid rating unixtime |
| 196 242 3 881250949 |
| 186 302 3 891717742 |
| 22 377 1 878887116 |
| 244 51 2 880606923 |
| 166 346 1 886397596 |
| 298 474 4 884182806 |
| 115 265 2 881171488 |
| 253 465 5 891628467 |
| 305 451 3 886324817 |
| 6 86 3 883603013 |
| 62 257 2 879372434 |
| 286 1014 5 879781125 |
| 200 222 5 876042340 |
| 210 40 3 891035994 |
| 224 29 3 888104457 |
| 303 785 3 879485318 |
| 122 387 5 879270459 |
| 194 274 2 879539794 |
| 291 1042 4 874834944 |
| 234 1184 2 892079237 |
- Start the Beeline command-line interface (CLI) for Hive to perform the data preparation steps (we are omitting specifying 'default' as the database as this is done automatically):
$ beeline -u jdbc:hive2://localhost:10000 -n hduser
- Create a table in Hive to hold our example data
| 0: jdbc:hive2://localhost:10000> CREATE TABLE u_data ( |
| userid INT, |
| movieid INT, |
| rating INT, |
| unixtime STRING) |
| ROW FORMAT DELIMITED |
| FIELDS TERMINATED BY '\t' |
| STORED AS TEXTFILE; |
- Import our example data into the Hive table we just created
0: jdbc:hive2://localhost:10000> LOAD DATA LOCAL INPATH 'ml-100k/u.data' OVERWRITE INTO TABLE u_data;
- Verify the data was imported, by printing the number of records in our table:
0: jdbc:hive2://localhost:10000> SELECT COUNT(*) FROM u_data;
We now have identical data loaded into a Hive Table (u_data) and also as a file on HDFS (u.data) and can start working with it.
Example 1 - Local Hadoop batch job
Since the data in
u.data
is tab-delimited and we don't want the
movieid or
unixtime columns, our mapper will remove those fields.
GAUSS Mapper (mapcolumns.gss)
| if eof(__STDIN);end;endif; |
| |
| |
| stats = csvReadM(__STDIN,1,1|3,"\t"); |
| |
| |
| format /rdt; |
| |
| |
| print stats[.,1]~stats[.,3]; |
The reducer will pull in the data and perform the calculations, printing out the results
GAUSS Reducer (reducecolumns.gss)
| if eof(__STDIN);end;endif; |
| |
| |
| stats = csvReadM(__STDIN, 1, 1|2, "\t"); |
| |
| all_ids = stats[.,1]; |
| user_ids = unique(all_ids); |
| |
| |
| final = user_ids ~ zeros(rows(user_ids), 1); |
| |
| for i(1, rows(user_ids), 1); |
| |
| uid = user_ids[i]; |
| |
| user_ratings = selif(stats[.,2], all_ids .== uid); |
| |
| final[i,2] = meanc(user_ratings); |
| endfor; |
| |
| format /rdt; |
| print final; |
With the proper scripts in place (ensure the following command has the appropriate paths for the Hadoop jar and GAUSS scripts), we can test our code.
$ hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-x.x.x.jar -input u.data -output GAUSSExample1 -mapper '/home/hduser/gauss16/tgauss -b /home/hduser/gauss16/mapcolumns.gss' -reducer '/home/hduser/gauss16/tgauss -b /home/hduser/gauss16/reducecolumns.gss' -numReduceTasks 2
Check the output directory for the results:
| $ hadoop fs -ls GAUSSExample1 |
| Found 3 items |
| -rw-r--r-- 1 hduser supergroup 0 2015-12-10 15:38 GAUSSExample1/_SUCCESS |
| -rw-r--r-- 1 hduser supergroup 16522 2015-12-10 15:38 GAUSSExample1/part-00000 |
| -rw-r--r-- 1 hduser supergroup 16522 2015-12-10 15:38 GAUSSExample1/part-00001 |
| $ hadoop fs -cat GAUSSExample1/part-00000 | head -n20 |
| |
| 1.00000000 3.61029412 |
| 3.00000000 2.79629630 |
| 5.00000000 2.87428571 |
| 7.00000000 3.96526055 |
| 9.00000000 4.27272727 |
| 10.00000000 4.20652174 |
| 12.00000000 4.39215686 |
| 14.00000000 4.09183673 |
| 16.00000000 4.32857143 |
| 18.00000000 3.88086643 |
| 21.00000000 2.67039106 |
| 23.00000000 3.63576159 |
| 25.00000000 4.05128205 |
| 27.00000000 3.24000000 |
| 29.00000000 3.64705882 |
| 30.00000000 3.76744186 |
| 32.00000000 3.31707317 |
| 34.00000000 4.05000000 |
| 36.00000000 3.80000000 |
You can merge the output into a single file after the job is complete:
$ hadoop fs -getmerge GAUSSExample1 /local/output/file
-
This will require usage of either the HortonWorks Hive ODBC Driver or the Microsoft Hive ODBC Driver.
Note: The architecture you select will need to match your GAUSS installation:
- Ensure successful connectivity via ODBC Data Source Administrator: An excellent resource for this process is on Page 7 of the HortonWorks ODBC Hive User Guide
- Run the following file in GAUSS to perform MapReduce using HiveQL:
GAUSS Program (db_hive.gss)
| new; |
| |
| trap 1; |
| id = dbAddDatabase("ODBC"); |
| trap 0; |
| if scalmiss(id); |
| print "Driver not found."; |
| end; |
| endif; |
| |
| gauss_home = "/home/hduser/gauss16"; |
| driver = "HortonWorks Hive ODBC Driver"; |
| |
| user = ""; |
| password = ""; |
| host = "192.168.0.25"; |
| port = 10000; |
| use_db = "default"; |
| use_ssl = 1; |
| |
| |
| if strlen(user) > 0 or use_ssl; |
| if strlen(password) > 0; |
| if use_ssl; |
| auth_mech = "4"; |
| else; |
| auth_mech = "3"; |
| endif; |
| opts = "AuthMech="$+auth_mech$+";UID="$+user$+";PWD="$+password$+";"; |
| else; |
| opts = "AuthMech=2;UID="$+user$+";"; |
| endif; |
| else; |
| opts = "AuthMech=0;"; |
| endif; |
| |
| dbSetDatabaseName(id, "DRIVER={"$+driver$+"};HOST="$+host$+";PORT="$+ntos(port)$+";HiveServerType=2;Schema="$+use_db$+";"$+opts); |
| dbOpen(id); |
| |
| rid = dbExecQuery(id, "set mapred.reduce.tasks=2;"); |
| |
| qid = dbExecQuery(id, "from (select userid,rating from u_data distribute by userid) t1 insert overwrite directory 'GAUSSExample2' reduce userid,rating using '"$+gauss_home$+"/tgauss -b "$+gauss_home$+"/reducecolumns.gss';"); |
| dbClose(id); |
Verify the job was ran successfully and inspect the output:
| $ hadoop fs -ls GAUSSExample2 |
| Found 2 items |
| -rwxr-xr-x 1 hduser supergroup 16489 2015-12-10 15:33 GAUSSExample2/000000_0 |
| -rwxr-xr-x 1 hduser supergroup 16524 2015-12-10 15:33 GAUSSExample2/000001_0 |
| $ hadoop fs -cat GAUSSExample2/000000_0 | head -n20 |
| \N |
| 2.00000000 3.70967742 |
| 4.00000000 4.33333333 |
| 6.00000000 3.63507109 |
| 8.00000000 3.79661017 |
| 10.00000000 4.20652174 |
| 12.00000000 4.39215686 |
| 14.00000000 4.09183673 |
| 16.00000000 4.32857143 |
| 18.00000000 3.88086643 |
| 20.00000000 3.10416667 |
| 22.00000000 3.35156250 |
| 24.00000000 4.32352941 |
| 26.00000000 2.94392523 |
| 28.00000000 3.72151899 |
| 30.00000000 3.76744186 |
| 32.00000000 3.31707317 |
| 34.00000000 4.05000000 |
| 36.00000000 3.80000000 |
| 38.00000000 3.71900826 |
The possibilities for MapReduce with GAUSS are endless. The level of integration can be modified to suit your needs, such as only using GAUSS for the Reduce phase.