Difference between revisions of "Data"
(→HIVE) |
|||
Line 446: | Line 446: | ||
Data the Google way - Map-Reduce. | Data the Google way - Map-Reduce. | ||
− | Hadoop & Friends. | + | ==Hadoop & Friends== |
+ | |||
+ | ===Streaming MapReduce=== | ||
+ | |||
+ | '''mapper.py''': | ||
+ | |||
+ | <source> | ||
+ | #!/usr/bin/env python | ||
+ | |||
+ | import sys | ||
+ | |||
+ | # input comes from STDIN (standard input) | ||
+ | for line in sys.stdin: | ||
+ | # remove leading and trailing whitespace | ||
+ | line = line.strip() | ||
+ | # split the line into words | ||
+ | words = line.split() | ||
+ | # increase counters | ||
+ | for word in words: | ||
+ | # write the results to STDOUT (standard output); | ||
+ | # what we output here will be the input for the | ||
+ | # Reduce step, i.e. the input for reducer.py | ||
+ | # | ||
+ | # tab-delimited; the trivial word count is 1 | ||
+ | print '%s\t%s' % (word, 1) | ||
+ | </source> | ||
+ | |||
+ | '''reducer.py''': | ||
+ | |||
+ | <source> | ||
+ | #!/usr/bin/env python | ||
+ | |||
+ | from operator import itemgetter | ||
+ | import sys | ||
+ | |||
+ | current_word = None | ||
+ | current_count = 0 | ||
+ | word = None | ||
+ | |||
+ | # input comes from STDIN | ||
+ | for line in sys.stdin: | ||
+ | # remove leading and trailing whitespace | ||
+ | line = line.strip() | ||
+ | |||
+ | # parse the input we got from mapper.py | ||
+ | word, count = line.split('\t', 1) | ||
+ | |||
+ | # convert count (currently a string) to int | ||
+ | try: | ||
+ | count = int(count) | ||
+ | except ValueError: | ||
+ | # count was not a number, so silently | ||
+ | # ignore/discard this line | ||
+ | continue | ||
+ | |||
+ | # this IF-switch only works because Hadoop sorts map output | ||
+ | # by key (here: word) before it is passed to the reducer | ||
+ | if current_word == word: | ||
+ | current_count += count | ||
+ | else: | ||
+ | if current_word: | ||
+ | # write result to STDOUT | ||
+ | print '%s\t%s' % (current_word, current_count) | ||
+ | current_count = count | ||
+ | current_word = word | ||
+ | |||
+ | # do not forget to output the last word if needed! | ||
+ | if current_word == word: | ||
+ | print '%s\t%s' % (current_word, current_count) | ||
+ | </source> | ||
+ | |||
+ | Place some text files into input dir in HDFS. | ||
+ | |||
+ | Script to run it: | ||
+ | |||
+ | <pre> | ||
+ | #!/bin/bash | ||
+ | export HADOOP_MAPRED_HOME='/opt/cloudera/parcels/CDH-4.3.0-1.cdh4.3.0.p0.22/lib/hadoop-0.20-mapreduce' | ||
+ | hadoop jar $HADOOP_MAPRED_HOME/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.3.0.jar \ | ||
+ | -file /home/ggdagw/mapper.py \ | ||
+ | -mapper mapper.py \ | ||
+ | -file /home/ggdagw/reducer.py \ | ||
+ | -reducer reducer.py \ | ||
+ | -input /user/ggdagw/input/* \ | ||
+ | -output /user/ggdagw/output-wc-streaming | ||
+ | </pre> | ||
* http://hadoop.apache.org/docs/r0.18.3/streaming.html | * http://hadoop.apache.org/docs/r0.18.3/streaming.html |
Revision as of 11:50, 5 July 2013
Data: How to surf, rather than drown!
Introduction
Data on Disk
A Salutary Tale of Copying Files
We'll start by considering data stored on a disk drive. One thing that you might not know is that file systems and disk drives perform best when they are dealing with larger files. But how large is large? Here's a simple example, which you can try yourself:
First of all, let's get ourselves a large-ish file. A compressed tarball of the source code for the Linux kernel will do. In this case it's about 70MB in size. We can time how long it takes to create a copy of it. Below are the results of doing this experiment on BlueCrystal phase 2:
BCp2$ wget https://www.kernel.org/pub/linux/kernel/v3.x/testing/linux-3.10-rc7.tar.xz BCp2$ time cp linux-3.10-rc7.tar.xz linux-3.10-rc7.tar.xz.copy real 0m3.530s user 0m0.000s sys 0m0.068s
Now, that tar ball contains around 47,000 files, many of which are only a few hundred or thousand of bytes in size. These are files at the smaller end of the scale. Let's unpack the tarball and time how long it takes to copy these files, one-by-one:
BCp2$ tar --use-compress-program=xz -xf linux-3.10-rc7.tar.xz BCp2$ time cp -r linux-3.10-rc7 linux-3.10-rc7.copy real 18m17.102s user 0m0.214s sys 0m6.359s
Yikes! that took over 350 times longer than copying the single, large file. (These timings were taken at ~10:45 on the 25 Jun 2013. Any differences from the above values will be due to differences in load on the filesystem, which is shared with all the other users of the cluster. More of that in a moment..)
Now, we can repeat these tests on a different system. I got the values below from my very standard desktop machine:
desktop$ wget https://www.kernel.org/pub/linux/kernel/v3.x/testing/linux-3.10-rc7.tar.xz desktop$ time cp linux-3.10-rc7.tar.xz linux-3.10-rc7.tar.xz.copy real 0m0.192s user 0m0.000s sys 0m0.156s desktop$ tar --use-compress-program=xz -xf linux-3.10-rc7.tar.xz desktop$ time cp -r linux-3.10-rc7 linux-3.10-rc7.copy real 0m25.961s user 0m0.168s sys 0m2.360s
That's a lot quicker! However, copying the small files still took over 130 times longer than copying the large file. (Again, your mileage may vary.)
But hang on, I thought BlueCrystal was meant to be "super"?! Well it is. It's just that it's filesystem is servicing much more than just your file copying request.
Modern SATA disks have read and write bandwidths close to 100MB/s--for example, I just tested my Linux desktop machine with a handy, built-in utility (System > Administration > Disk utility) and recorded a read performance of ~75MB/s. We can compare this to the filesystem on BlueCrystal phase 2, where we see a peak of about 500MB/s throughput on the mixed workload of full, running cluster.
Another test below highlights a key difference between a parallel filesystem and that on a single disk. If I start several processes writing to the disk in my desktop machine, I see a rapid drop-off in performance as the number of processes increases. In contrast the parallel filesystem is able to support many processes with modest degradation to file writing performance.
System | Clients x1 | Clients x4 | Clients x8 |
Desktop | ~65MB/s | ~10MB/s | ~0.5MB/s |
Parallel FS | ~160MB/s | 130MB/s | 67MB/s |
So what have we learned?
- You'll get better file access performance if you use larger rather than smaller files.
- If you are using BlueCrystal, you may want to consider using disks local to the compute nodes. These nodes are 300GB in size and are accessible via /local. If you do elect to use local disks:
- It is advisable to check the available space in /local as part of your job.
- Please do clean up /local after your job has finished
- when transferring data from node to node, please use the suffix .data.cluster when referring to a node, e.g. scp u04n037.data.cluster:/local/mydata $HOME.
Additionally (and not demonstrated above):
- Make use of minimal seek operations, which implies
- Make use of contiguous access patterns, rather than random ones.
Data over the Network
Filling the pipe.
Data when Writing your own Code
Locality of Reference: http://en.wikipedia.org/wiki/Locality_of_reference
Temporal locality: We expect to re-use of data already seen. Spatial locality: We expect to access data stored close to data that we've already seen.
Computer hardware is optimised to exploit these principles. We will get the most from our hardware if we design software accordingly.
Memory Hierarchy
L1 Cache | Picking up a book off your desk (~3s) |
L2 Cache | Getting up and getting a book off a shelf (~15s) |
Main Memory | Walking down the corridor to another room (several minutes) |
Disk | Walking the coastline of Britain (about a year) |
Files & File Formats
Data Analytics
Some common operations you may want to perform on your data:
- Cleaning
- Filtering
- Calculating summary statics (means, medians, variances)
- Creating plots & graphics
- Tests of statistical significance
- Sorting and searching
Selecting the right tools.
Databases
It may be tempting to store your data in many small (text) files. Perhaps this provides an intuitive way to catalogue the information. However, we saw above that you will suffer a performance penalty if you store your data this way. If your data is amenable to being stored in tabular form, i.e. in rows and columns, then you may well be better off using one of many available database packages. These packages are optimised for search performance and excel at random access patterns for both reading and writing data.
SQL (the Structured Query Language) is a language designed for working with databases and is common to the examples given below.
SQLite
A very popular relational database. Lightweight as it is unmanaged and so has very simple access controls. However does support SQL. The command line interface is widely available. For example, it is installed on BlueCrystal.
Let's start up the command line interface:
sqlite3 test.db
In this case, we've specified the file test.db. If the file exists, it will open it. Else it will create a new database to be stored in the given file.
Without further ado, let's create a table in given database and populate it with some records:
sqlite> CREATE TABLE planets(Id INT, Name TEXT, Diameter REAL, Mass REAL, Orbital_Period REAL);
sqlite> INSERT INTO planets VALUES(1,'Mercury',0.382,0.06,0.24);
sqlite> INSERT INTO planets VALUES(2,'Venus',0.949,0.82,0.72);
sqlite> INSERT INTO planets VALUES(3,'Earth',1.0,1.0,1.0);
sqlite> INSERT INTO planets VALUES(4,'Mars',0.532,0.11,1.52);
sqlite> INSERT INTO planets VALUES(5,'Jupiter',11.209,317.8,5.20);
sqlite> INSERT INTO planets VALUES(6,'Saturn',9.449,95.2,9.54);
sqlite> INSERT INTO planets VALUES(7,'Uranus',4.007,14.6,19.22);
sqlite> INSERT INTO planets VALUES(8,'Neptune',3.883,17.2,30.06);
Now, let's see the fruits of our labours. After setting some formatting information, we can use an SQL command to select all the records in the planets table:
sqlite> .mode column
sqlite> .headers on
sqlite> SELECT * FROM planets;
Id Name Diameter Mass Orbital_Period ---------- ---------- ---------- ---------- -------------- 1 Mercury 0.382 0.06 0.24 2 Venus 0.949 0.82 0.72 3 Earth 1.0 1.0 1.0 4 Mars 0.532 0.11 1.52 5 Jupiter 11.209 317.8 5.2 6 Saturn 9.449 95.2 9.54 7 Uranus 4.007 14.6 19.22 8 Neptune 3.883 17.2 30.06
We can also issue a more exacting query. In this case, let's ask for all the planets which have a mass greater than or equal to that of the Earth:
sqlite> SELECT * FROM planets WHERE Mass >= 1.0;
Id Name Diameter Mass Orbital_Period ---------- ---------- ---------- ---------- -------------- 3 Earth 1.0 1.0 1.0 5 Jupiter 11.209 317.8 5.2 6 Saturn 9.449 95.2 9.54 7 Uranus 4.007 14.6 19.22 8 Neptune 3.883 17.2 30.06
So far so good. Let's create an additional table called moons:
sqlite> CREATE TABLE moons(Name TEXT, Num_Moons INT);
sqlite> INSERT INTO moons VALUES('Mercury',0);
sqlite> INSERT INTO moons VALUES('Venus',0);
sqlite> INSERT INTO moons VALUES('Earth',1);
sqlite> INSERT INTO moons VALUES('Mars',2);
sqlite> INSERT INTO moons VALUES('Jupiter',67);
sqlite> INSERT INTO moons VALUES('Saturn',62);
sqlite> INSERT INTO moons VALUES('Uranus',27);
sqlite> INSERT INTO moons VALUES('Neptune',13);
sqlite> INSERT INTO moons VALUES('Pluto',5);
Now that we have two tables, we can examine the very powerful feature of JOINing tables , common to all good relational databases. A, so called, natural inner join will create a new table, on-the-fly, from all the records in the joined tables which have matching values:
sqlite> SELECT Name, Orbital_Period, Num_moons FROM planets NATURAL JOIN moons;
Name Orbital_Period Num_Moons ---------- -------------- ---------- Mercury 0.24 0 Venus 0.72 0 Earth 1.0 1 Mars 1.52 2 Jupiter 5.2 67 Saturn 9.54 62 Uranus 19.22 27 Neptune 30.06 13
Note that Pluto was not listed as a result of the inner join, since it is not present in the planets table.
We can also create an outer join, which is not so constrained. SQLite does not, as yet, support a 'right outer join' and so I needed to swap the order of the tables in the join so that my 'left outer join' contained all those in the left table (i.e. the table containing the name Pluto).
sqlite> SELECT Name, Orbital_Period, Num_moons FROM moons NATURAL LEFT OUTER JOIN planets;
Notice that Pluto record is blank in the Orbital_Period column as there is no corresponding value in the planets table.
Name Orbital_Period Num_Moons ---------- -------------- ---------- Mercury 0.24 0 Venus 0.72 0 Earth 1.0 1 Mars 1.52 2 Jupiter 5.2 67 Saturn 9.54 62 Uranus 19.22 27 Neptune 30.06 13 Pluto 5
If you would like to export to, e.g., a CSV file (this is useful for subsequent import into, e.g., R):
sqlite> .mode csv
sqlite> .output planets.csv
sqlite> SELECT * FROM planets WHERE Mass >= 1.0;
Where the contents of the file planets.csv is:
Id,Name,Diameter,Mass,Orbital 3,Earth,1.0,1.0,1.0 5,Jupiter,11.209,317.8,5.2 6,Saturn,9.449,95.2,9.54 7,Uranus,4.007,14.6,19.22 8,Neptune,3.883,17.2,30.06
There is, of course a corresponding command to import data from a file into a table. For example, if I had information about stars in an appropriately formatted CSV files, I could load it into a table called stars using the commands:
sqlite> .separator ','
sqlite> .import stars.csv stars
To exit the SQLite command line interpreter type:
sqlite> .exit
It is perfectly possible--indeed, perhaps preferable--to access a database from inside a program. (SQLite was really designed with that in mind.) For example, you can learn more about accessing an SQLite database from inside a python script at:
More information about SQLite and other interfaces for access is at:
- SQLite: e.g. http://zetcode.com/db/sqlite/
MySQL
Taking a step up on the functionality ladder, MySQL is a popular, open-source, enterprise grade relational database management system (RDBMS), which is readily available for most operating systems. In the notes below, I will assume that you have MySQL installed and have set a password for the root user. For popular Linux distributions, such as Ubuntu and CentOS, MySQL is easily installed through the package manager. (More information on MySQL installation.)
OK, our first tasks will be to connect to the MySQL monitor tool as the administrator, and to create new users and databases:
gethin@gethin-desktop:~$ mysql -u root -p Enter password:
After typing the administrator password we are greeted with:
Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 36 Server version: 5.1.69-0ubuntu0.10.04.1 (Ubuntu) Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql>
Let's create a user. Note that the password, changeme in this case, is in clear text.
mysql> CREATE USER 'gethin'@'localhost' IDENTIFIED BY 'changeme';
Query OK, 0 rows affected (0.03 sec)
A database, called menagerie, to practice with:
mysql> CREATE DATABASE menagerie;
Query OK, 1 row affected (0.00 sec)
And we'll grant some privileges (in this case ALL--a full set of privileges) for all the tables to be stored in the menagerie database to the user called gethin:
mysql> GRANT ALL ON menagerie.* TO 'gethin'@'localhost';
Query OK, 0 rows affected (0.02 sec)
Now, to work on that database, we want to disconnect as the administrator and connect as an appropriate user:
gethin@gethin-desktop:~$ mysql -u gethin -p
Enter password:
switch to the database in question:
mysql> use menagerie
Database changed
and create a new table:
mysql> CREATE TABLE planets (Id INT NOT NULL AUTO_INCREMENT,
-> Name VARCHAR(10),
-> Diameter REAL,
-> Mass REAL,
-> Orbital_Period REAL,
-> PRIMARY KEY (Id)
-> );
Query OK, 0 rows affected (0.00 sec)
Now, if we had a CSV file of the form:
1,'Mercury',0.382,0.06,0.24 2,'Venus',0.949,0.82,0.72 3,'Earth',1.0,1.0,1.0 4,'Mars',0.532,0.11,1.52 5,'Jupiter',11.209,317.8,5.20 6,'Saturn',9.449,95.2,9.54 7,'Uranus',4.007,14.6,19.22 8,'Neptune',3.883,17.2,30.06
we could load this directly into our planets table, without the labourious SQL INSERT commands:
LOAD DATA LOCAL INFILE 'planets.csv' INTO TABLE planets FIELDS TERMINATED BY ',';
et voila, we can see the data duly loaded into the table:
mysql> SELECT * FROM planets;
+----+-----------+----------+-------+----------------+
| Id | Name | Diameter | Mass | Orbital_Period |
+----+-----------+----------+-------+----------------+
| 1 | 'Mercury' | 0.382 | 0.06 | 0.24 |
| 2 | 'Venus' | 0.949 | 0.82 | 0.72 |
| 3 | 'Earth' | 1 | 1 | 1 |
| 4 | 'Mars' | 0.532 | 0.11 | 1.52 |
| 5 | 'Jupiter' | 11.209 | 317.8 | 5.2 |
| 6 | 'Saturn' | 9.449 | 95.2 | 9.54 |
| 7 | 'Uranus' | 4.007 | 14.6 | 19.22 |
| 8 | 'Neptune' | 3.883 | 17.2 | 30.06 |
+----+-----------+----------+-------+----------------+
8 rows in set (0.00 sec)
UoB Data Haven
The University maintains a central Oracle database which is available to members of staff to store their data in a managed and backed-up environment. To learn more, visit: http://www.bris.ac.uk/it-services/applications/infosystems/datahaven/.
GUI Tools
If you would prefer to access your data and tables via a GUI, SQL Navigator is a popular, cross-platform tool. The image below shows access to the planets table in the menagerie database:
The University also runs courses on learning and using MS Access. More information visit: http://www.bris.ac.uk/it-services/learning/staff.html.
Numerical Packages
Such as R, MATLAB & Python.
Bespoke Applications
Rolling Your Own
Principles: Sort & binary search.
Tools: Languages, libraries and packages.
When Data gets Big
Quotas.
Local Disks.
Swapping.
Data the Google way - Map-Reduce.
Hadoop & Friends
Streaming MapReduce
mapper.py:
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
reducer.py:
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Place some text files into input dir in HDFS.
Script to run it:
#!/bin/bash export HADOOP_MAPRED_HOME='/opt/cloudera/parcels/CDH-4.3.0-1.cdh4.3.0.p0.22/lib/hadoop-0.20-mapreduce' hadoop jar $HADOOP_MAPRED_HOME/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.3.0.jar \ -file /home/ggdagw/mapper.py \ -mapper mapper.py \ -file /home/ggdagw/reducer.py \ -reducer reducer.py \ -input /user/ggdagw/input/* \ -output /user/ggdagw/output-wc-streaming
Hive
Hive is an SQL-like interface to data stored in HDFS. Below is an example of using Hive with the planets table that we saw earlier when considering SQLite and MySQL.
[ggdagw@hd-37-00 ~]$ hive hive>
hive> CREATE DATABASE menagerie;
OK
Time taken: 0.115 seconds
hive> use menagerie;
OK
Time taken: 0.061 seconds
hive> CREATE TABLE planets (Id INT, Name STRING,
> Diameter FLOAT,
> Mass FLOAT,
> Orbital_Period FLOAT)
> ROW FORMAT delimited fields terminated by ',' STORED AS TEXTFILE;
hive> LOAD DATA LOCAL INPATH 'planets.csv' INTO TABLE planets;
hive> SELECT * FROM planets;
OK
1 'Mercury' 0.382 0.06 0.24
2 'Venus' 0.949 0.82 0.72
3 'Earth' 1.0 1.0 1.0
4 'Mars' 0.532 0.11 1.52
5 'Jupiter' 11.209 317.8 5.2
6 'Saturn' 9.449 95.2 9.54
7 'Uranus' 4.007 14.6 19.22
8 'Neptune' 3.883 17.2 30.06
Time taken: 0.281 seconds
Summary
- Use large files whenever possible.
- Disks are poor at servicing a large number of seek requests.
- Check that you're making best use of a computer's memory hierarchy, i.e.:
- Think about locality of reference.
- Go to main memory as infrequently as possible.
- Go to disk as infrequently as possible as possible.
- Check that your are still using the right tools if your data grows.