Search This Blog

Sunday, 1 September 2019

Spark Memory Management



 Spark’s performance advantage over MapReduce  is due to Spark’s In-Memory Persistence and Memory Management Rather than writing to disk between each pass through the data, Spark has the option of keeping the data on the executors loaded into memory. That way, the data on each partition is available in-memory each time it needs to be accessed.

Spark offers three options for memory management: in-memory as deserialized data, in-memory as serialized data, and on disk. Each has different space and time advantages:


  1. In memory as deserialized Java objects
  2. As serialized data
  3. On disk


In memory as deserialized Java objects
The most intuitive way to store objects in RDDs is as the original deserialized Java objects that are defined by the driver program. This form of in-memory storage is the fastest since it reduces serialization time; however, it may not be the most memory efficient, since it requires the data to be stored as objects.

As serialized data
Spark objects are converted into streams of bytes as they are moved around the network using the standard Java serialization library. This approach may be slower since serialized data is more CPU-intensive to read than deserialized data,
but it is more memory efficient, since it allows the user to choose a more efficient representation. While Java serialization is more efficient than full objects, Kryo serialization can be much better than the java serialization

On disk:

RDDs, whose partitions are too large to be stored in RAM on each of the executors, can be written to disk. This strategy is obviously slower for repeated computations but can be more fault-tolerant for long sequences of transformations, and maybe the only feasible option for enormous computations.

The persist() function in the RDD class lets the user control how the RDD is stored. By default, persist() stores an RDD as deserialized objects in memory, but the user can pass one of nthe umerous storage options to the persist() function to control how the RDD is stored. We will cover the different options for RDD reuse
   “Types of RDD Reuse: Cache, Persist, Checkpoint, Shuffle Files” . When persisting RDDs, the default implementation of RDDs evicts the least recently used partition (called LRU caching) if the space it takes is required to compute or to cache a new partition. However, you can change this behavior and control Spark’s memory prioritization with the persistencePriority() function in the RDD class.

Tuesday, 27 August 2019

Critical Spark Enhancements


Spark is super-slow for all wide data (when there are >15kb columns and >15kb rows). Most of the genomics/transcriptomic data is wide because number of genes is usually >20kb and number of samples ass well. Very popular GTEX dataset is a good example ( see for instance RNA-Seq data athttps://storage.googleapis.com/gtex_analysis_v7/rna_seq_datawhere gct is just a .tsv file with two comments in the beginning). Everything done in wide tables (even simple "describe" functions applied to all the genes-columns) either takes hours or gets frozen (because of lost executors) irrespective of memory and numbers of cores. While the same operations work fast (minutes) and well with pure pandas (without any spark involved). 

https://issues.apache.org/jira/browse/SPARK-28547

Sunday, 23 December 2018

scd type 2 in hive


truncate table cust_income;
truncate table cust_income_intmdt;
truncate table cust_income_target;


insert into table cust_income values
(1,'P','100000','20000','120000','1-2-2016'),
(2,'P','120000','20000','140000','1-2-2016'),
(3,'P','130000','20000','150000','1-2-2016');


insert into cust_income_intmdt
select x.cust_id, x.person_org_code, x.annual_net_inc, x.total_tax, x.est_gross_inc, x.dw_ld_ts, x.start_dt, x.end_dt from cust_income_target x where end_dt is not null
union all
select x.cust_id, x.person_org_code, x.annual_net_inc, x.total_tax, x.est_gross_inc, x.dw_ld_ts, x.start_dt, y.dw_ld_ts end_dt from cust_income_target x
left outer join
cust_income y on (x.cust_id = y.cust_id) where x.end_dt is null
union all
select x.cust_id, x.person_org_code, x.annual_net_inc, x.total_tax, x.est_gross_inc, x.dw_ld_ts, x.dw_ld_ts start_dt, null end_dt from cust_income x ;

truncate table cust_income_target;

insert into cust_income_target
select * from cust_income_intmdt;

truncate table cust_income_intmdt;

truncate table cust_income;

insert into table cust_income values
(1,'P','110000','20000','120000','2-2-2016'),
(2,'P','130000','20000','140000','2-2-2016'),
(4,'P','130000','20000','150000','2-2-2016');

insert into cust_income_intmdt
select x.cust_id, x.person_org_code, x.annual_net_inc, x.total_tax, x.est_gross_inc, x.dw_ld_ts, x.start_dt, x.end_dt from cust_income_target x where end_dt is not null
union all
select x.cust_id, x.person_org_code, x.annual_net_inc, x.total_tax, x.est_gross_inc, x.dw_ld_ts, x.start_dt, y.dw_ld_ts end_dt from cust_income_target x
left outer join
cust_income y on (x.cust_id = y.cust_id) where x.end_dt is null
union all
select x.cust_id, x.person_org_code, x.annual_net_inc, x.total_tax, x.est_gross_inc, x.dw_ld_ts, x.dw_ld_ts start_dt, null end_dt from cust_income x ;

truncate table cust_income_target;

insert into cust_income_target
select * from cust_income_intmdt;

truncate table cust_income_intmdt;

truncate table cust_income;

insert into table cust_income values
(4,'P','120000','20000','120000','3-2-2016'),
(2,'P','140000','20000','140000','3-2-2016'),
(5,'P','150000','20000','150000','3-2-2016');

insert into cust_income_intmdt
select x.cust_id, x.person_org_code, x.annual_net_inc, x.total_tax, x.est_gross_inc, x.dw_ld_ts, x.start_dt, x.end_dt from cust_income_target x where end_dt is not null
union all
select x.cust_id, x.person_org_code, x.annual_net_inc, x.total_tax, x.est_gross_inc, x.dw_ld_ts, x.start_dt, y.dw_ld_ts end_dt from cust_income_target x
left outer join
cust_income y on (x.cust_id = y.cust_id) where x.end_dt is null
union all
select x.cust_id, x.person_org_code, x.annual_net_inc, x.total_tax, x.est_gross_inc, x.dw_ld_ts, x.dw_ld_ts start_dt, null end_dt from cust_income x ;

truncate table cust_income_target;

insert into cust_income_target
select * from cust_income_intmdt;

truncate table cust_income_intmdt;

Monday, 13 November 2017

Abinitio on unix to sqlserver on windows conn requirements

                    Abinitio on unix to sqlserver on windows conn requirements

This article details the steps required to connect from abinitio on linux ot sqlserver on windows where the sqlserver is kerberos enabled.

Step 1 :
Install 64 bit unix odbc drivers (unixODBC-2.3.0) 
Once installed check the below path
/apps/abinitio/abinitio-V3-3-2-1/misc/64/unixODBC-2.3.0/

Step 2:
Install mssqlodbc_V11
Once installed check the below path
/apps/sqlserver/client/native/11.0/lib/

Step 3 :
/etc/krb5.conf
Should contain

---------------------------------------------------------------
######################################################
# krb5.conf - Network Configuration File
######################################################

[libdefaults]
         default_realm = INTRANET.abc.COM
         default_tkt_enctypes = xxx-xx-xxx xx-xxxx
         default_tgs_enctypes = xxx-xx-xxx xx-xxxx
         dns_lookup_kdc = false
         dns_lookup_realm = false
[realms]
        INTRANET.abc.COM = {
                kdc = intranet.abc.com
                master_kdc = intranet.abc.com
                default_domain = INTRANET.abc.COM
        }


        INTRANET = {
                kdc = intranet.abc.com
                master_kdc = intranet.abc.com
                default_domain = intranet.abc.com
        }
[domain_realm]
                .intranet.abc.com = INTRANET.abc.COM
                intranet.abc.com = INTRANET.abc.COM

-------------------------------------------------------------------------------------

Step 4 :

Request has to be raised to make sqlservers Kerberos enabled (to authenticate cross platform clients) 
SPN have to be registered


Step 5:

Test the connection using sqlserver client (which gets installed as part of step 2)
Go to > cd /apps/sqlserver/client/native/11.0/bin/

Ex command :
sqlcmd -E -S servername,portno -d master -q "select name from sys.databases"
-E :implies trusted connection ( takes the credentials from keytab), no need of giving user name and password explicity
-S : server name
-d : database name
-q : query

unfortunately : if this does not work , plz change the /apps/sqlserver/client/native/11.0/krb5.conf as in the step 3


Author : Venkata Krishnan P

How to run hadoop (pseudo mode) on windows

How to run hadoop (pseudo mode) on windows :

Here goes the link which details you to how to run hadoop on windows , You can make use of eclipse and infact debug the mapreduce applications straight on your desktop , here the hadoop will run on the pseudomode, (I have tested this on 3GB ram machine) and it worked absolutely fine.

Please closely follow the steps mentioned in the below link




Wednesday, 30 August 2017

Choose your data storage format In Hadoop Eco System

                  Choose your data storage format:

Shall I go with Text or Avro or ORC or Sequence or Parquet formats,
guys believe me, it all relies on the type and size of data you are working on

Tools Compatibility:
Sometimes depend on compatibility of the tools you are working with
Impala does not understand ORC, --> then choose parquet or RC format

Memory Perspective:
1. Parquet/ORC with snappy compression downsizes the file to almost quarter.
2. Avro with deflate compression downsizes the files to almost one fourth.
3. Some compression codecs will not let the file splittable, which kills the very purpose of HDFS.
4. Sequence, Avro, Parquet, ORC offer splitability regardless of the compression codec.
5. If you go with text or csv format, parsing leads to compromise with retrieval time
6. Sequence file format is mainly designed to exchange data betwen MR jobs.

Querying Speed:
Columnar data formats like Parquet and ORC offer an advantage (in terms of querying speed)
when you have many columns but only need a few of those columns for your analysis since Parquet
and ORC increase the speed at which the queries are performed. However, that
advantage can be levied if you still need all the columns for use cases such as search. in which case you could decide based on your use case.

Schema Evolution 
When the underlying file structure has changed, for instance it can be
data type of a column, addition/removal of columns,altering of columns,
Textfile : wont store the schema
Parquet/Avro : stores the schema,
Parquet: only lets the addition of new columns at the end of columns and it doesn’t handle removal of columns,
Avro : is quiet generous , lets addition, deletion, and renaming of multiple columns

Now choose the format based on your projects nature from schema evolution perspective

Scenarios:
if your use case is to write fast and you seldom query the huge datasets, go for text formatif
if your use case is retrieve the data fast , go for columnar format, here writing time would be compromised due to some extra processing
if your use case is schema evolving, go for avro


peformed tests on the below five data formats against hive and impala
– Text
– Sequence
– Avro
– Parquet
– ORC

land on the below url for results of the above tests
http://www.svds.com/dataformats/







Sunday, 28 May 2017

scd type without duplicates

SCDtype without duplicates, Used in hive as due to immature update functionalities in hive 

truncate table cust_income;
truncate table cust_income_intmdt;
truncate table cust_income_target;


insert into table cust_income values
(1,'P','100000','20000','120000','1-2-2016'),
(2,'P','120000','20000','140000','1-2-2016'),
(3,'P','130000','20000','150000','1-2-2016');


insert into cust_income_intmdt
select tgt.cust_id, tgt.person_org_code, tgt.annual_net_inc, tgt.total_tax, tgt.est_gross_inc, tgt.dw_ld_ts, tgt.start_dt, tgt.end_dt from cust_income_target tgt where tgt.end_dt is not null
union all
select tgt.cust_id, tgt.person_org_code, tgt.annual_net_inc, tgt.total_tax, tgt.est_gross_inc, tgt.dw_ld_ts, tgt.start_dt, src.dw_ld_ts end_dt from cust_income_target tgt
left outer join
cust_income src on (tgt.cust_id = src.cust_id) where tgt.end_dt is null and (tgt.annual_net_inc/NVL(src.annual_net_inc,1))<> 1
union all
select src.cust_id, src.person_org_code, src.annual_net_inc, src.total_tax, src.est_gross_inc, src.dw_ld_ts, src.dw_ld_ts start_dt, null end_dt from cust_income src
left outer join
cust_income_target tgt on (src.cust_id = tgt.cust_id) where (src.annual_net_inc/NVL(tgt.annual_net_inc,1))<> 1;

truncate table cust_income_target;

insert into cust_income_target
select * from cust_income_intmdt;

truncate table cust_income_intmdt;

truncate table cust_income;

insert into table cust_income values
(1,'P','110000','20000','130000','2-2-2016'),
(2,'P','130000','20000','150000','2-2-2016'),
(4,'P','130000','20000','150000','2-2-2016'),
(3,'P','130000','20000','170000','1-2-2017');

insert into cust_income_intmdt
select tgt.cust_id, tgt.person_org_code, tgt.annual_net_inc, tgt.total_tax, tgt.est_gross_inc, tgt.dw_ld_ts, tgt.start_dt, tgt.end_dt from cust_income_target tgt where tgt.end_dt is not null
union all
select tgt.cust_id, tgt.person_org_code, tgt.annual_net_inc, tgt.total_tax, tgt.est_gross_inc, tgt.dw_ld_ts, tgt.start_dt, case when(tgt.annual_net_inc/NVL(src.annual_net_inc,1))<> 1 then src.dw_ld_ts else tgt.end_dt end end_dt from cust_income_target tgt
left outer join
cust_income src on (tgt.cust_id = src.cust_id) where tgt.end_dt is null
union all
select src.cust_id, src.person_org_code, src.annual_net_inc, src.total_tax, src.est_gross_inc, src.dw_ld_ts, src.dw_ld_ts start_dt, null end_dt from cust_income src
left outer join
cust_income_target tgt on (src.cust_id = tgt.cust_id) where (src.annual_net_inc/NVL(tgt.annual_net_inc,1))<> 1;

truncate table cust_income_target;

insert into cust_income_target
select * from cust_income_intmdt;

truncate table cust_income_intmdt;

truncate table cust_income;

insert into table cust_income values
(4,'P','130000','20000','150000','3-2-2017'),
(2,'P','140000','20000','160000','3-2-2016'),
(5,'P','150000','70000','170000','3-2-2016');

insert into cust_income_intmdt
select tgt.cust_id, tgt.person_org_code, tgt.annual_net_inc, tgt.total_tax, tgt.est_gross_inc, tgt.dw_ld_ts, tgt.start_dt, tgt.end_dt from cust_income_target tgt where tgt.end_dt is not null
union all
select tgt.cust_id, tgt.person_org_code, tgt.annual_net_inc, tgt.total_tax, tgt.est_gross_inc, tgt.dw_ld_ts, tgt.start_dt, case when(tgt.annual_net_inc/NVL(src.annual_net_inc,1))<> 1 then src.dw_ld_ts else tgt.end_dt end end_dt from cust_income_target tgt
left outer join
cust_income src on (tgt.cust_id = src.cust_id) where tgt.end_dt is null
union all
select src.cust_id, src.person_org_code, src.annual_net_inc, src.total_tax, src.est_gross_inc, src.dw_ld_ts, src.dw_ld_ts start_dt, null end_dt from cust_income src
left outer join
cust_income_target tgt on (src.cust_id = tgt.cust_id) where (src.annual_net_inc/NVL(tgt.annual_net_inc,1))<> 1;

truncate table cust_income_target;

insert into cust_income_target
select * from cust_income_intmdt;

truncate table cust_income_intmdt;

Spark Memory Management

 Spark’s performance advantage over MapReduce  is due to Spark’s In-Memory Persistence and Memory Management Rather than writing to disk ...