Hadop Reklam

Sponsor Reklam

Thursday, November 28, 2013

Identifying Duplicate Records with Fuzzy Matching

Fuzzy Entity Matching

The details of the matching algorithms can be found from my earlier posts. For an entity, various attribute types are supported including integer, double, categorical, text, time, location etc. Essentially for any pair of entities, distance is calculated between corresponding attributes. Attribute wise distances are aggregated over all the attributes of an entity to find the distance between two entities.
Why is Jaccard distance not appropriate for this use case? Given two blocks of text, Jaccard similarity depends on the number of  words common to both and the union of words in the the two text blocks. Two words will not match whether there is difference in one character or multiple characters. Levenshtein Distance gives us more fine grained text matching algorithm.

Levneshtein Distance

Given two words, it’s defined as the number inserts and deletes of characters necessary to make one word same as the other. The Levenshtein Distance between two words w1 and w2 is calculated as follows
distance(w1,w2) = length(w1) + length(w2) – 2 * maxCommonSubSeqLength(w1,w2)
where
maxCommonSubSeqLength is the length of longest common sub sequence between w1 and w2.
The distance is normalized, by dividing the distance with sum of the length of the two words as below, forcing the normalized distance to be always be between 0 and 1.0.
normDistance(w1,w2) = distance(w1,w2) / (length(w1) + length(w2))
For a pair of  text blocks, normalized distance is calculated between corresponding word pairs. The final distance between two text fields is the distance averaged over all the words. The implementation is the class EditDistanceSimilarity. Various attribute distance algorithms are supported e.g., euclidian, manhattan etc.
This is not the only way to calculate Levinshtein Distance. It is also possible to take the whole text field as one token and calculate distance by setting  the parameter edit.dist.token to false. I have one word of caution. The edit distance calculation computation time grows non linearly with the token length. Treating the whole text field as one token may be very computationally intensive.

Similarity Map Reduce

The class SameTypeSimilarity contains the implementation. The details of it’s inner workings can be found in my earlier posts. Essentially, it does a hash based self join and works as follows
1.     We configure a set of buckets
2.     Each record is hashed into one of the buckets
3.     A bucket pair and the associated records is processed in one reducer call.
4.     The reducer pairs records from each bucket in a nested loop and calculates distances
It turns out, that the proper configuration of the number of buckets is critical for this example. The number of buckets should be large, so that each reducer call does not have to process too many records.  The number of buckets is set using the parameterbucket.count. While running the map reduce, I had the Hadoop job tracker terminating a reducer task because of the heart beat time out, until I increased the bucket count.
As thumb rule, I would suggest choosing the number of buckets such that each bucket has about 4 or 5 records. Otherwise you may have the unfortunate experience of reducer taking too much processing time in one call and eventually getting timed out terminated.
In this Map Reduce, every record is compared with every other record. So the complexity is O(n x n). However, for our particular example, if we had two data sets that are supposed to be identical, a simpler approach could be taken. We could simply compare corresponding records from each set. The complexity in that case would have been O(n).

Attribute Distance Threshold

One way to optimize the problem and to convert the complexity to a almost O(n) is toabandon distance processing between two records, as soon as the distance between  an attribute pair is found to be above a predefined threshold.
For example, if the the distance between the name fields for two customer records is significant enough, we can skip processing the remaining attributes and set the distance between the two records to a large value.
The threshold value is defined in the meta data JSON file as below. The threshold can be set for any number of attributes. This is how it’s set for the name field of a customer record.
{
                         "name" : "name",
                         "ordinal" : 1,
                         "dataType" : "text",
                         "distThreshold" : 0.4
}

Two Near Identical Data Sets

If you have two data sets where the records are supposed to identical, except for small differences, the processing can be performed in O(n) time. This is optimization is done by enabling inter set matching as below.
inter.set.matching=true
set.ID.size=1
The first parameter enables inter set matching. For this to work, the entity ID of each record needs to be encoded in a special way. The first few characters of the entity ID is the set ID and the remaining characters comprise the real entity ID within a set. The length of the set ID is defined by the second parameter above.
Essentially, matching is performed between records for the two sets only if entity ID after peeling off the set ID matches.  So, we are finding distances  only between corresponding entities from the  two data sets.

Duplicate Customer Records

We will be using customers records as example. The record has the the following fields. The first field which is the ID, does not enter into distance calculation in any way.
1.     ID
2.     Name
3.     Address
4.     City and state
5.     Phone number
The data consists of two sets of identical customer records. For some records, I introduced small typographical errors. Here is some sample input
106379,Richard Gomez,2934 Encino Pt,San Antonio TX 78259,210 4811932
158280,Jim Dobbs,2756 Bulls Bay Hwy,Jacksonville FL 32220,312 2850284
137943,Robert Lewis,194 Buckboard Dr,Augusta GA 30907,406 5404029
125849,Jena Marin,276 Durham St,Menlo Park CA 94025,650 2957406
156290,Dharam Patel,84 Prospect Hill Dr,Tewksbury MA 01876,718 4702915
.........
206379,Richard Gomez,2935 Encino Pt,San Antonio TX 78259,210 4811932
258280,Jim Dobbs,2756 Bulls Bay Hwy,Jacksonville FL 32220,312 2850284
237943,Robert Lewis,194 Buckboard Dr,Augusta GA 30908,406 5404029
225849,Jena Marin,276 Durham St,Menlo Park CA 94025,650 2957406
The output MR class SameTypeSimilarity consists of ID pair followed by the distance. There there types of output as follows
1.     Distance is 0 for identical records
2.     Distance is close to 0 for records that are identical except for small  typographical errors
3.     Different records with high distance value
The second type is of main interest to us. Because of the fuzzy matching logic, we are able identify records as duplicate records in spite of small typographical errors. Here some sample output. The last column is the distance scaled by 1000.
106379,278253,757
106379,206379,16
178253,278253,0
178253,206379,757
278253,206379,757
168295,185062,612
We see all 3 types of output as described earlier. Let’s pay attention to the records with ID 106379 and 206379 with the distance between them as 16. It’s the case of near duplicate records with small typographical errors. Here are the two corresponding records. There is one wrong character in the address field.
106379,Richard Gomez,2934 Encino Pt,San Antonio TX 78259,210 4811932
206379,Richard Gomez,2935 Encino Pt,San Antonio TX 78259,210 4811932
That one misplaced character has caused the distance between the records to be 16 instead of 0. Otherwise the distance would have been 0.

Attribute Distance Aggregation

The attribute distances are aggregated for all attributes. There are different  aggregation algorithms. For this example, I have used mahattan distance a.k.a L1 distance.
All attributes are treated equally. However it is possible to assign different attribute weights through the meta data. For example, if I had specified a weight greater than (e.g. 1.2) for the phone number field, it would have shrunk the distance. In other works, the distance between two phone numbers are made to appear less than the actual distance.


Streaming SQL for Hadoop

Make the Elephant Fly. Real-time Big Data with SQLstream
‘Real-time’ and ‘Hadoop’ had been considered synonymous, yet Hadoop is not as real-time as many have hoped. Hadoop has many strengths, but was never intended for low latency, real-time analytics over high velocity machine data streams. With the SQL language emerging as the key enabler for the mainstream adoption of Hadoop, executing streaming SQL queries over Hadoop extends the platform out to the edge of the network, making it possible to query unstructured log file, sensor and network machine data sources on the fly and in real-time.
Real-time Operational Intelligence on Hadoop
SQLstream accelerates Hadoop to process live, high velocity unstructured data streams,  delivering the low latency, streaming operational intelligence demanded by today’s real-time businesses.
SQLstream for Hadoop combines SQLstream’s real-time operational intelligence from high velocity machine data with the power of Hadoop for high volume data storage and on-going analysis. SQLstream for Hadoop enables:
·        Stream persistence – Hadoop HBase as an active archive for streaming data and derived intelligence using the Flume API. SQLstream also performs continuous aggregation to support high velocity streams without data loss.
·        Stream replay – restream the complete history of persisted streams from HBase for ‘fast forwarding’ of time-based and spatial analytics. Various interfaces can be utilized, including Cloudera’s Impala.
·        Streaming data queries, joining streaming real-time data with historical streams and intelligence persisted in HBase.


 From SQL to NoSQL to Streaming SQL
The first phase of Hadoop and Big Data saw the emergence of NoSQL data storage platforms, looking to overcome the rigidity of normalized schemas. However, as the technology hits mainstream industry, the need for simpler, high performance and reliable queries is driving a resurgence in SQL as the de facto language for Big Data processing (for example, Cloudera Impala and Google BigQuery). What is now apparent is that SQL is the ideal language for processing data streams using real-time, windows-based queries. The issue with normalization and rigid schemes is a non-issue for a streaming data platform – there are no tables, no data gets stored!
What is Streaming SQL?
SQL was developed to process stored data in a traditional RDBMS. It has a massive existing skills base, proven scalability and sophisticated dynamic query optimization. It also functions equally well, if not better, as a real-time stream computing query language. SQLstream’s ANSI SQL:2008 streaming SQL queries are exactly that – standards compliant. We test our SQL queries for standards compliance against the leading RDBMS SQL platforms. There are however two differences. SQLstream’s core s-Server stream computing platform does not persist any data before processing (Hadoop HBase is the default storage platform for stream persistence although any data storage platform can be supported), and streaming SQL queries execute continuously, processing new data as they are created. So why SQL as a stream computing language?
·        Proven scalability with sophisticated query optimization.
·        Rapid development – a few SQL rules have immense power.
·        SQL skills are readily available in the marketplace worldwide.
·        Supports direct migration of SQL applications to and from existing databases and data warehouses.
A Streaming SQL Example
The following query is a basic example of a streaming SQL query. The query finds Orders from New York that ship within one hour. Unlike a traditional static SQL query, this query executes continuously, processing new data as they arrive across all streams in the join, and pushing out results as the query condition is met. The keyword STREAM is used to maintain standards compatibility as without it the query would return a table not a stream of results that continue ad infinitum.

Streaming SQL supports all standard SQL operations for data streams, including:
·        Stream Select, Insert and Update
·        Stream Join
·        Streaming Partition By and Group By
·        Full set of arithmetic, string, logical, date and timestamp operators
·        Support for User Defined Functions (UDXes)
Streaming SQL queries over Hadoop
SQLstream s-Server, our core streaming computing platform, operates both as a streaming Big Data engine and as a streaming SQL language extension for Hadoop HBase. In Hadoop mode, Hadoop HBase is utilized as the default platform for stream persistence. 
 Data can be streamed directly into Hadoop HBase in real-time, including the raw machine data as it is collected from the log files, applications and sensors, also filtered and enhanced versions of the same streams, as well as any pre-aggregated and analytical intelligence information. SQLstreams streaming SQL language support for Hadoop offers:
·        Real-time operational intelligence on Hadoop without low-level coding
·        Stream persistence for all raw machine data and derived intelligence information
·        SQLstream Connector for Hadoop HBase maintain and utilize your Big Data storage platforms in real-time.
·        Streaming integration between Big Data storage platforms.
·        Replay persisted streams for time-based and geospatial analysis of existing stored data.
A key advantage with SQLstream is the ability to extract and replay processed data from Big Data storage platforms and join this information with the incoming, live data streams. Operational intelligence results are enhanced by combining real-time data against known trends, eliminating false alarms and longer term comparisons. The extraction and data processing in SQLstream uses standards-based SQL queries, enabling powerful real-time queries to be deployed over streaming stored data.


Wednesday, November 27, 2013

GIS Tools for Hadoop

Looking at data without location, most of the time seems like looking at just part of a story. Including location and geography in analysis reveals patterns and associations that otherwise are missed. As Big Data emerges as a new frontier for analysis, including location in Big Data is becoming significantly important.
Data that includes location, and that is enhanced with geographic information in a structured form, is often referred to as Spatial Data. Doing Analysis on Spatial data requires an understanding of geometry and operations that can be preformed on it. Enabling Hadoop to include spatial data and spatial analysis is the goal of this Esri Open Source effort.
GIS Tools for Hadoop is an open source toolkit intended for Big Spatial Data Analytics. The toolkit provides different libraries:
  • Esri Geometry API for Java: A generic geometry library, can be used to extend Hadoop core with vector geometry types and operations, and enables developers to build MapReduce applications for spatial data.
  • Spatial Framework for Hadoop: Extends Hive and is based on the Esri Geometry API, to enable Hive Query Language users to leverage a set of analytical functions and geometry types. In addition to some utilities for JSON used in ArcGIS.
  • Geoprocessing Tools for Hadoop: Contains a set of ready to use ArcGIS Geoprocessing tools, based on the Esri Geometry API and Spatial Framework for Hadoop. Developers can download the source code of the tools and customize it; they can also create new tools and contribute it to the open source project. Through these tools ArcGIS users can move their spatial data and execute a pre-defined workflow inside Hadoop.
The GIS Tools for Hadoop toolkit allows users, who want to leverage the Hadoop Framework, to do spatial analysis on spatial data; for example:
1.    Run Filter and aggregate operations on billions of spatial data records inside Hadoop based on spatial criteria.
2.    Define new areas represented as polygons, and run Point in Polygon analysis on billions of spatial data      records inside Hadoop.
3.    Visualize analysis results on a map with rich styling capabilities, and a rich set of base maps.
4.    Integrate your maps in reports, or publish them as map applications online.
Getting started
Developers can get started at Spatial Framework for Hadoop.
ArcGIS users can get started at Geoprocessing Tools for Hadoop.
How it all works?
Overall there are four Github projects that make up the toolkit.
Firstly, the Esri Geometry API for Java: project. This is a generic library that includes geometry objects, spatial operations, and spatial indexing, it can be used to spatially enable Hadoop. By deploying the Esri geometry API library (as a jar) within Hadoop, developers are able to build Map/Reduce applications that are spatially enabled, by leveraging the Esri Geometry API along with the other Hadoop APIs in their application.
Secondly, the Spatial Framework for Hadoop project. This library includes the user defined objects that extend Hive with the capabilities of the Esri Geometry API. By enabling this library in Hive, users are able to construct queries that are very SQL like using HQL. In this case, users don’t have to write a Map/Reduce application, they can interact with Hive, write their SQL like queries and get answers directly from Hadoop. Queries in this case can include spatial operations and values.
Thirdly, the Geoprocessing Tools for Hadoop project. These tools are specifically used in ArcGIS. Through the tools, users can connect to Hadoop from ArcGIS. Connecting to Hadoop from ArcGIS is really useful to the toolkit users, since they can import their analysis result in ArcGIS for Visualization. They can also do more complex and sophisticated analysis now that they narrowed down their data to a specific subset. Additionally, users can leverage the ArcGIS platform capabilities to publish their maps to web and mobile apps, and can integrate it with BI reports.
Finally, the GIS Tools for Hadoop project. This project is intended as a place to include multiple samples that leverage the toolkit. The samples can leverage the low level libraries, or the Geoprocessing tools. A couple of samples are available to help you test the deployment of the spatial libraries with Hadoop and Hive, and make sure everything runs with no issues before you start leveraging the setup from your HQL queries, or from the GP tools. To check your deployment, for Hive and GP tools usage, the sample point-in-polygon-aggregation-hive can be utilized. The sample leverages the data and lib directories on the same path.


Pigeon

Pigeon
Pigeon is a spatial extension to Pig that allows it to process spatial data. All functionalities in Pigeon are introduced as user-defined functions (UDFS) which makes it unobtrusive and allows it to work with your existing systems. All the spatial functionality is supported by ESRI Geometry API a native Java open source library for spatial functionality licensed under  Apache Public License
Our target is to have something like Postgis but for Pig instead of PostgreSQL. We use the same function names to make it easier for existing users to use Pigeon. Here is an example the computes the union of all ZIP codes in each city.
zip_codes = LOAD 'zips' AS (zip, city, geom);
zip_by_city = GROUP zip_codes BY city;
zip_union = FOREACH zip_by_city
GENERATE group AS city, ST_Union(geom);

Data types
Currently, Pig does not support the creation of custom data types. This is not the best thing for Pigeon because we wanted to have our own data type (Geometry) similar to PostGIS. As a work around, we use the more generic type bytearray as our main data type. All conversions happen from bytearray to Geometry and vice-verse on the fly while the function is executed. If a function expects an input of type Geometry, it receives a bytearray and converts it Geometry. If the output is of type Geometry, it computes the output, converts it to bytearray, and returns that bytearray instead. This is a little bit cumbersome, but the Pig team is able to add custom data types so that we have a cleaner extension.

How to compile
Pigeon requires ESRI Geometry API to compile. You need to download a recent version of the library and add it to the classpath of your Java compiler to be able to compile the code. Of course you also need Pig classes to be available in the classpath. The current code is tested against ESRI Geometry API 1.0 and Pig 0.11.1. Currently you have to do the compilation manually but we are planning to create an ANT build file to automate the compilation. Once you compile the code, you can create a jar file out of it and REGISTER it in your Pig scripts.

How to use
To use Pigeon in your Pig scripts, you need to REGISTER the jar file in your Pig script. Then you can use the spatial functionality in your script as you cdo with normal functionality. Here are some simple examples on how to use Pigeon.

Let's say you have a trajectory in the form (latitude, longitude, timestamp). We need to for a Linestring out of this trajectory when points in this linestring are sorted by timestamp.

points = LOAD 'trajectory.tsv' AS (time: datetime, lat:double, lon:double);
s_points = FOREACH points GENERATE ST_MakePoint(lat, lon) AS point, time;
points_by_time = ORDER s_points BY time;
points_grouped = GROUP points_by_time ALL;
lines = FOREACH points_grouped GENERATE ST_AsText(ST_MakeLine(points_by_time));
STORE lines INTO 'line';
Supported functions
Here is a list of all functions that are currently supported.

Basic Spatial Functions

ST_AsHex Converts a shape to its Well-Known Binary (WKB) format encoded as Hex string
ST_AsText Converts a shape to its Well-Known Text (WKT) format
ST_MakePoint Creates a geometry point given two numeric coordinates
ST_Area Calculates the area of a surface shape (e.g., Polygon)
ST_Envelope Calculates the envelope (MBR) of a shape
ST_Buffer Computes a buffer with the specified distance around a geometry.
ST_Size Returns number of points in a linestring
Spatial Predicates

ST_Crosses Checks if one polygon crosses another polygon
ST_IsEmpty Tests whether a shape is empty or not.
Spatial Analysis

ST_Buffer Computes a buffer with the specified distance around a geometry.
ST_ConvexHull Computes the minimal convex polygon of a shape.
Aggregate functions

ST_MakeLine Creates a line string given a bag of points
ST_MakePolygon Creates a polygon given a circular list of points
ST_ConvexHull Computes the convex hull from a bag of shapes
ST_Union Computes the spatial union of a set of surfaces (e.g., Polygons)
ST_Extent Computes the minimal bounding rectangle (MBR) of a set of shapes

Tuesday, November 26, 2013

SpatialHadoop

Usage examples


Once you have SpatialHadoop configured correctly, you are ready to run some sample programs. The following steps will generate a random file, index it using a Grid index, and run some spatial queries on the indexed file. The classes needed for this example are all contained in the spatialhadoop*.jar shipped with the binary release. You can type 'bin/hadoop jar spatialhadoop*.jar' to get the usage syntax for the available operations.
To generate a random file containing random rectangles, enter the following command
$ bin/hadoop jar spatialhadoop*.jar generate test mbr:0,0,1000000,1000000 size:1.gb shape:rect
This generates a 1GB file named 'test', where all rectangles in the file are contained in the rectangle with corner at (0,0) and dimensions 1Mx1M units.

If you have your own file that needs to be processed, you can upload it the same way you do with traditional Hadoop by typing
$ bin/hadoop fs -copyFromLocal <local file path> <HDFS file path>
Then you can index this file using the following command

To index this file using a Grid index
$ bin/hadoop jar spatialhadoop*.jar index test test.grid mbr:0,0,1000000,1000000 sindex:grid
To see how the grid index partitions this file, type:
$ bin/hadoop jar spatialhadoop*.jar readfile test.grid
This shows the list of partitions in file, each defined by boundaries, along with the number of blocks in each partition.

To run a range query operation on this file
$ bin/hadoop jar spatialhadoop*.jar rangequery test.grid rq_results rect:500,500,1000,1000
This runs a range query over this file with the query range set to the rectangle at (500,500) with dimensions 1000x1000. The results will be stored in an HDFS file named 'rq_result'

To run a knn query operation on this file
$ bin/hadoop jar spatialhadoop*.jar knn test.grid knn_results point:1000,1000 k:1000
This runs a knn query where the query point is at (1000,1000) and k=1000. The results are stored in HDFS file 'knn_results'

To run a spatial join operation
First, generate another file and have it indexed on the fly using the command
$ bin/hadoop jar spatialhadoop*.jar generate test2.grid mbr:0,0,1000000,1000000 size:100.mb sindex:grid
Now, join the two files via the Distributed Join algorithm using the command
$ bin/hadoop jar spatialhadoop*.jar dj test.grid test2.grid sj_results