Search This Blog

Thursday, 9 February 2017

Dynamic vs Static partitioning in Hive

Partitioning: An Anecdote for performance

Partitioning are of two types in hive,
             1. Dynamic and Static partitioning  

When any one of them is used, queries are run against only a portion of the data, providing significant performance gains

Static Partitioning—Used when the values for partition columns are known well in advance of loading the data into a Hive table

Dynamic Partitioning—Used when the values for partition columns are known only during loading of the data into a Hive table

When to choose Dynamic Partitioning?

Deciding when to use dynamic partitioning can be very challenging. We recommend using dynamic partitioning in these common scenarios:

Loading from an existing table that is not partitioned: In this scenario, the user doesn’t employ partitioning initially because the table in question is expected to remain relatively small. However, over the course of time, the table grows quite large and performance issues begin to appear. These issues should be corrected using a one-time load to dynamically partition the table.
Unknown values for partition columns: In some scenarios, it’s very difficult to know the unique values of all partition columns unless the data is manually inspected. As you can imagine, manual inspection isn’t a realistic option when dealing with batch pipelines or terabytes of data. You can try writing a Hive query to retrieve all unique value sets of partition columns. Let’s say, the query result contains many unique value sets. You’ll end up creating and executing an “ALTER TABLE” statement for each unique value set. Running a Hive query, preparing alter table statements, and executing them will significantly delay your data pipelines. Also, it’s an error-prone and cumbersome process. Hence, static partitioning is not used under these circumstances. However, dynamic partitioning can come to your rescue if you’d like to offload this work to Hive. Hive can detect the unique value sets for partition columns and create partitions on-the-fly.
Let’s revisit the scenario we discussed earlier in this blog. Imagine you have a very big table with data accumulated over many years. You want to improve the performance by partitioning the data. To complicate the scenario further, imagine you decide to not only partition the data by year, but also by month, day, hour, and advertiser id (let’s assume your data has an advertiser_id column). In this particular case, dynamic partitioning is very helpful. Why? You may be running a pipeline that ingests data hourly. You know exactly which year, month, day, and hour the data belongs to, but you can’t assume or tell which advertiser_id each record contains unless you manually inspect the data. Hive can automatically partition the data on all the required columns if dynamic partitioning is used.

Modifying the number of partition columns: In this scenario, the user initially designs a table with limited partition columns. As the data grows, the user decides to address performance concerns by adding additional partition column[s]. This is done in Hive by: 1. creating a new table with all required partition columns, 2. loading data into the new table from the already existing partitioned table, and 3. deleting the existing table. Dynamic partitioning should be used to perform step (2), as explained earlier.
Avoiding Dynamic Partitioning Pitfalls

So far, we’ve covered the advantages of dynamic partitioning. Now, let’s look at some common mistakes and pitfalls that users often encounter. The most common mistake made when using dynamic—or even static—partitions is to specify an existing column name as a partition column. In Hive, the partition columns are virtual columns, which helps with organizing and fetching the data efficiently. If you see the following error while creating a table with partitions, it means the column specified in a partition specification is also a regular column present in the table schema:

"Error in semantic analysis: Column repeated in partitioning columns"

Some other important points to remember when using dynamic partitions are:

Typically, data is loaded into a partitioned table in Hive by “INSERT OVERWRITE TABLE <table_name> PARTITION (col1=value1, col2=value2…) SELECT <columns> FROM <table_name>…” query. As mentioned above, when using dynamic partitioning, values for partition columns must not be specified. Hive will detect the values automatically from the data.
In order to detect the values for partition columns automatically, partition columns must be specified at the end of the “SELECT” clause in the same order as they are specified in the “PARTITIONED BY” clause while creating the table.
If the table has only dynamic partition columns, then the configuration setting hive.exec.dynamic.partition.mode should be set to non-strict mode:
SET hive.exec.dynamic.partition.mode=non-strict;

Hive enforces a limit on the number of dynamic partitions it can create. The default is 100 dynamic partitions per node, with a total (default) limit of 1000 dynamic partitions across all nodes. However, this setting is configurable. If your job tries to create more partitions than allowed, you may see the following exception:

metadata.HiveFatalException: [Error 20004]: Fatal error occurred when node tried to create too many dynamic partitions. The maximum number of dynamic partitions is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. Maximum was set to: 100

The setting hive.exec.max.dynamic.partitions,controls the total number of dynamic partitions for a table. Whereas, hive.exec.max.dynamic.partitions.pernode controls the maximum number of dynamic partitions that can be created on a node. So, to fix the above exception, try gradually increasing the number of partitions allowed using the above settings:

SET hive.exec.dynamic.partition=true;
SET hive.exec.max.dynamic.partitions=2048;
ET hive.exec.max.dynamic.partitions.pernode=256;

Now, let’s look at a complete example demonstrating how to create and load data using dynamic partitioning.

Change dynamic partition mode to non-strict:

SET hive.exec.dynamic.partition.mode=non-strict

Create the table with partitions:

CREATE TABLE patents (
citing_patent      INT,
cited_patent       INT,
assignee           STRING,
companyname        STRING,
publication_date   STRING)

PARTITIONED BY (
year  INT,
month INT,
day   INT)

Load the data:

INSERT OVERWRITE INTO TABLE patents PARTITION (year, month, day)

SELECT citing, cited, name, company, year(publication_date), month(publication_date), day(publication_date)

FROM patents_raw_data;

The table, “patents_raw_data”, is an external table, which points to patent raw data. Notice the order of the partition columns specified in the “SELECT” clause is in exactly the same order as the partition columns specified in the “PARTITIONED BY” clause in create table query. Also, the columns year, month, and day are purposefully specified at the very end in the “SELECT” clause. Hive splits the data into multiple partitions by year, month, and day values. It also updates the Hive metastore automatically without explicit user intervention.

Next, let’s look at what can happen with dynamic partitioning when the data is skewed. The data that belongs to one partition is sent to only one reducer. However, consider a scenario where 90 percent of the data belongs to only one partition and the rest is spread across multiple partitions. In this situation, one reducer will be heavily loaded, while all other reducers have finished their work. The time required to finish the job will depend solely on the longest running reducer. This will significantly increase the data load time.

To overcome this problem, we suggest our customers perform some queries on the data to check how evenly it’s distributed. The key idea is to distribute the data evenly across the reducers. To achieve even distribution, the table can be further divided by buckets or new partition columns. The solution will vary depending on the use case and the nature of the data. In one recent example, we worked with one of our customers to address their performance issues. After gaining a deeper understanding of their use case and data, we determined that the data was skewed and suggested bucketing the data by another column to evenly spread the data load across the reducers. This resolved the problem and significantly reduced their data load times.

There aren’t many significant downsides to using dynamic partitioning. The only word of caution here is, if you don’t know your data well, you could unintentionally implement bad partitioning strategies when using dynamic partitioning. Static partitioning is much more controlled and you have to know your data to define the partitions. With dynamic partitioning, a user might mistakenly assume that there will only be 10 partition values flowing in as part of a dataset, when a partition column actually has thousands of different values in the incoming data. This could create thousands of tiny partitions and have unintentional side-effects.

Overall, dynamic partitioning is a well-implemented and tested feature, but is often overlooked in practice. Due to its proven ability to significantly ease user pain and improve performance, we encourage our customers to use dynamic partitioning—in accordance with the best practices outlined above—whenever it fits their requirements.

Tuesday, 7 February 2017

Sqoop Commands


Sqoop eval command for mysql with keystores and trustcenter

































         Step 2 :  alter the table properties from internal to external and alter the file location or load the file which is residing on a different path .

Friday, 3 February 2017

Performing SCD Type 2 in Hive


To begin with I just want to suggest couple of approaches to implement SCD Type 2 Dimension in Hive.
 - Do everything in RDBMS and just sqoop the results (make sense only if hadoop is used for data storage and discovery)
- Bring in incremental data and rebuild the dimension everyday (makes sense only in cases where dataset is not huge enough and you need to access everytime in SCD Type 2 way and there are many users)
- Bring in incremental data and make sense out of it when you need it (makes sense whether data is big or however many no. of users access the data in SCD Type 2 fashion)

Now that we have suggested out final approach. Lets dive deeper into technical specifics of how to do it. It turns out you can implement SCD Type #2 schema on read using hive SerDe, UDF and Hive Transform Fucntion. Now lets begin the step by step process to uncover the finer details:
- Bring in the External Data Source.
 Lets Assume the original dataset of a company that tracks its employees onsite engagement looks as follows:
 main_file.txt 
1,Victor,CyberTron,2014-02-18,Chicago
2,Natalie,CyberTron,2014-01-01,NewYork
3,Nathan,CyberTron,2014-01-01,NewYork
4,Sam,Acme,2011-01-01,Boston
4,Sam,Acme,2014-06-01,Atlanta

change_file.txt 
1,Victor,Acme,2014-06-09,Chicago
2,Natalie,Acme,2014-06-01,NewYork
3,Nathan,Acme,2014-06-01,NewYork
6,Hammond,Acme,2014-07-01,NewYork

- Add Both Files in External table and then insert into a bucketed table based on id, as follows:

Create Table IF NOT EXISTS  VJ_Main 
 ( Id               int,
    Name       String,
    Employer  String,
     EffDt        String,
     City String)
CLUSTERED BY (Id)
SORTED BY (EffDt)
INTO 4 BUCKETS
ROW
FORMAT DELIMITED
   FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

Query the dataset to look for values in the table
hive -e "select * from vj_main"
4 Sam          Acme         2011-01-01 Boston
4 Sam          Acme         2014-06-01 Atlanta
1 Victor        CyberTron 2014-02-18 Chicago
1 Victor        Acme         2014-06-09 Chicago
2 Natalie      CyberTron 2014-01-01 NewYork
2 Natalie      Acme         2014-06-01 NewYork
6 Hammond Acme         2014-07-01 NewYork
3 Nathan      CyberTron 2014-01-01 NewYork
3 Nathan      Acme         2014-06-01 NewYork

- At this step, we execute the following hive query with transform function:

ADD File /home/cloudera/testhiveSCD/map_script.sh;
FROM VJ_Main  X
SELECT  TRANSFORM ( X.Id, X.NAme, X.Employer, X.EffDt, X.City )
USING 'map_script.sh'
AS  Id, Name, Employer, City,  EffStartDt, EffEndDt;

The bash script 'map_script.sh'  is as follows:
#!/bin/bash
unset prevId;
unset RecId;
unset EffEndDt;
unset
EffStartDt;
while read line
do
    firstpart=`echo  "$line" | cut -f1-3`
    secondpart=`echo "$line" | cut  -f5-`
    EffDt=`echo "$line" | cut -f4`
    EffStartDt="$EffDt"
    RecId=`echo "$line" | cut  -f1`
    if [ "X${prevId}" != "X${RecId}"  ]
    then
       unset EffEndDt;
    else
       EffEndDt="$EffDt"
    fi
    if [ ! -z  $prevId ]
    then
        echo  "$prevfirstpart    $prevsecondpart    $prevStartDt    $EffEndDt"
    fi
    prevId="$RecId"
    prevEffDt="$EffDt" 
    prevfirstpart="$firstpart"
    prevsecondpart="$secondpart"
    prevStartDt="$EffStartDt"
    prevEndDt="$EffEndDt"
done
echo "$prevfirstpart    $prevsecondpart    $prevStartDt    $EffEndDt"

- After the query executes the result appears as follows:
 4 Sam        Acme           Boston     2011-01-01 2014-06-01
4 Sam         Acme           Atlanta     2014-06-01 (null)
1 Victor       CyberTron   Chicago   2014-02-18 2014-06-09
1 Victor       Acme           Chicago   2014-06-09 (null)
2 Natalie      CyberTron  NewYork 2014-01-01 2014-06-01
2 Natalie      Acme          NewYork 2014-06-01 (null)
6 Hammond Acme          NewYork 2014-07-01 (null)
3 Nathan      CyberTron  NewYork 2014-01-01 2014-06-01
3 Nathan      Acme          NewYork 2014-06-01 2014-06-01

Now, the output result is in SCD Type#2 format. This entire process can also be done in Hive by using Hive UDF or by Overriding the Deserializer to implement the same windowing as done in bash script. The only difference is these proposed implementations will be done in java using StructObjectIspectors instead of environment variables.

Spark Memory Management

 Spark’s performance advantage over MapReduce  is due to Spark’s In-Memory Persistence and Memory Management Rather than writing to disk ...