How to Solve the Real Big Data Problems – Load Huge Excel Files

Original article was published on Artificial Intelligence on Medium

Step-1: Set-up for the project

We have a pretty simple project set up for this project. Just one directory and one transformation file.

I prefer to create all the work-related projects in one single project directory named ‘Work’; I know, how creative! We need to perform the below; you can skip this step.

  1. Create our project directory – LoadData.
  2. Create a directory ‘Input’ within the project directory.
  3. Create an empty Transformation named ‘Main.ktr’ within the project directory.
Project Directory Structure

If you are not aware of the words like transformations or job, then I will recommend the below-mentioned story.

Step-2: Create Database Table

I am assuming that you have the database already installed here. We are using PostgreSQL.

Now, I prefer to create tables using Django Models. You don’t necessarily have to use this methodology.

Having said so, it makes our life easy by writing Django models instead of manually creating tables and column natively. Django models does that for us using the simple migrations command and also get CRUD (create, read, update and delete) functionality out of the box.

You can choose below mentioned two options to create the database and table. I have create a table medium_db

  • PostgreSQL create script.
 — Table: public.economic_data — DROP TABLE public.economic_data;CREATE TABLE public.economic_data
(
id integer NOT NULL DEFAULT nextval(‘economic_data_id_seq’::regclass),series_reference character varying(255) COLLATE pg_catalog.”default” NOT NULL,indicator_name character varying(255) COLLATE pg_catalog.”default” NOT NULL,period character varying(45) COLLATE pg_catalog.”default” NOT NULL,indicator_value numeric(30,10) NOT NULL,status character varying(255) COLLATE pg_catalog.”default” NOT NULL,
indicator_unit character varying(255) COLLATE pg_catalog.”default” NOT NULL,
group_name character varying(255) COLLATE pg_catalog.”default” NOT NULL,series_name character varying(255) COLLATE pg_catalog.”default”,
CONSTRAINT economic_data_pkey PRIMARY KEY (id)
)
TABLESPACE pg_default;ALTER TABLE public.economic_data
OWNER to YOURUSER;
  • Django Model script to run migrations.
from django.db import models# Create your models here.class EconomicData(models.Model):series_reference = models.CharField(db_column="series_reference",max_length=255,help_text="Unique code to identify a particular record",verbose_name="Series Reference",)indicator_name = models.CharField(db_column="indicator_name",max_length=255,verbose_name="Name of the indicators")period = models.CharField(db_column="period",max_length=45,verbose_name="Period")indicator_value = models.DecimalField(db_column="indicator_value",max_digits=30,decimal_places=10,verbose_name="Value of the Field")status = models.CharField(db_column="status",max_length=255,verbose_name="Status of the value For eg, Final or Revised")indicator_unit = models.CharField(db_column="indicator_unit",max_length=255,verbose_name="Unit of the indicators")group_name = models.CharField(db_column="group_name",max_length=255,verbose_name="Group of the indicators")series_name = models.CharField(db_column="series_name",max_length=255,verbose_name="Series of the indicators"null=True)def __str__(self):return f"{self.indicator_name} - {self.value}"class Meta:db_table = "economic_data"verbose_name = "Economic Data"verbose_name_plural = "Economic Data"

If you are interested in understanding how we can benefit from Django in our data pipeline, then please do let me know the same in the response section below.

We have our table ready. Now, let create our transformation.

Step-3: Loader Transformation

We need to create a loader transformation which read our input CSV, perform manipulation and load the data onto the database.

We need to open our Main.ktr file and drag some plugin as mentioned below.

Step-1: Drag Steps

  1. Firstly, let’s add a small description about the transformation. Documentation is a key for any data pipeline.
  2. Drag ‘CSV file input’, ‘Data grid’, ‘Join rows (cartesian product)’, ‘User defined Java expression’, ‘Filter rows’, ‘Text file output’, ‘Table Output’ plugins from the design tab onto the canvas.
  3. Rename the fields as per our naming convention.
Main transformation after renaming

Step-2: Configure Properties

  1. We need to configure the properties for each of the above-mentioned steps. Let configure for our CSV input step, we need to browse for our input file in the Filename field and click on Get Fields. We can tweak the NIO buffer size as per our memory availability; it will process the files in batches of 50K records each.
  2. We need to add data in Data Grid (Replicating a table here). Here, we are using a data grid for example data. In a real-world scenario, you will get this data from some dimension table. We are standardizing the Group Names. You can refer the below screenshot for the data. We need to add column names in the Meta tab and actual data in the Data tab.
  3. In the Join rows step, we need to map the fields that we want from the input with our dimension table/grid. Since we are mapping groups here, we need to add the Condition to map the same.
  4. In the User defined java expression, we will configure the custom row-level condition. We need to define our New field as ‘series_reference_flag’, here we want to change the ‘Series_reference’ field and append ‘_R’ if the status column is ‘Revised’. In our Java expression, we will add the following condition – ‘status == “Revised”? Series_reference + “_R” : Series_reference’; this is java code. We can perform similar conditions or calculations, Powerful! Lastly, we need to add Value type to ‘String’.
  5. In the Filter rows step, we need to define our condition of passing records without null values.
  6. In the Text file output (error report), we need to add Filename as${Internal.Entry.Current.Directory}/error_reportand change the Extension to ‘CSV’.
  7. In Table output step, we need to create a new Connection to connect to our database. We can connect to any database as per our requirements. We here will connect to PostgreSQL; refer screenshot for connection details. We need to browse for the Target table to ‘economic_data’. We need to check the Specify database fields field. We then need to map the input/transformation fields to table fields.
Input Step Configurations
Data Grid Configurations – Dimension
Join rows Configurations
Condition Configurations
Filtering Null Configurations
Database Output Configurations
Database Mapping

Step-4: Let’s Speed-up Process

Now that we have configured the properties, we can speed the process by creating multiple threads for inserting data. This will boost the performance. PDI provides us with the facility to configure multi-threading per steps. If we use it in an input step, it will multiply the records. Whereas, if we use it for output steps like database, it will distribute the records.

PDI provides us with many options to optimize the performance. We can perform the below steps to boost the performance.

  1. Change the NIO buffer size in our in input step, define our batch size.
  2. Change Max. cache size (in rows) in lookup step, define the number of rows it will store in cache memory instead of querying to the database.
  3. Change Commit Size, similar to buffer size change the batch size to insert records.
  4. Use multiple threads to perform the activity, we can add a Dummy step and right-click to select Change Number of Copies to Start from 1 to anything above 1 as per our requirements before our output step.
  5. Please note, if we want to perform multiple threads on table output step, then we cannot use point number four. We will then have to add a Dummy step before output and distribute the records in multiple output table steps.

Wow! we have so many options, should we change all the performance optimizer? Short answer is NO. We need to try with sample data and perform multiple tests on what works best for us.

Step-2: Evaluation

Let’s run the flow without performance optimizer and then compare it by applying the optimizer.

Success
Step Matrix – It took 8m 42s
Added simple optimizer — 20X on Condition and Two Threads for Output
Step Matrix — It took 4m 35s

We reduced the time taken by almost 50% to perform the same activity by adding some simple performance optimizer.

Conclusion

We took a problem statement and tried solving it using multiple approaches and tried optimizing it as well. In theory, you can apply this process to fit your requirement and try to optimize it further as well. PDI provides us with PostgreSQL bulk loader step as well; I have tried that step as well. However, there was not any significant performance booster provided by the same.

We cannot optimize the code at the beginning and will have to perform multiple tests to get the ideal results. However, to shorten the learning curve you can always read through my experiences and find solutions to the problem statement by subscribing to my email list using the below link.

See you in the next post. Happy ETL