Slice Data Lake

Every data-driven company, in its initial stages, starts with managing data with SQL and NoSQL databases and eventually as the amount of data grows it becomes time-consuming to perform complex analysis on the underlying data stores. In the current era, where the technology is being consumed at a fast pace, the data is growing by leaps and bounds. This leads to the process shift to Handling and Analysing Big Data which the legacy SQL NoSQL DBs can’t handle easily.

It goes without saying that such was our case too! Hence we started building our own Data Lake based on streaming data, Python-Spark ETL process and using Presto engine based AWS Athena. Well, we thought of writing this blog, to explain our approach and way of thinking to the world, so hold on to your mouse and keep scrolling.

“In God we trust. All others must bring data” — No, just kidding! 

Idea / Motivation

We at slice started building our tools (Mobile app and backend infra) back in 2015/16. We were small back then, trying to figure out the ways to engage our target group of young professionals and students with our idea of providing them a credit line based on their creditworthiness.

Initially, we built our systems on NoSQL DB due to its ease of use and easy  data/documents modification process. Now this is all because of its fluid/changing capabilities due to it being NoSQL. With the consumer base being small, this was easy to use for analytics up to a certain scale to build our credit policies and risk models.

Over the time as we started growing by leaps and bounds, our customer base grew with us too. NoSQL DB was still good enough for the Mobile app related functionalities but it couldn’t be used easily for Analytical use cases. So we decided to start making use of Redshift, the AWS analytical DB engine. While this appeared the right choice at the time, but again with our growth and expansion plans coupled with an influx of new customers, the data under concern for analytical use cases had started growing exponentially. Just to give you an idea — the data that we received in 2016-2017 is nothing as compared to the data we receive on a monthly basis now, in terms of volume.

AWS Redshift is nice for analytical use cases, but it comes with a lot of technical expertise requirement and restrictions, namely:

  1. The sort and distribution keys need to be defined for any new table by thinking of all the possible query usage patterns that the table would be used in and also thinking about the query patterns which could be used in future.
  2. The queries are fast if every best practice is followed here, but again that requires expertise in the area of DB designing, clustering, etc.
  3. As per the current trends of data growth and the usage of data, nothing remains constant forever. The table schema which appears best, for now, may not be good enough for the future querying and analytical needs.

Similar was the case for us! The overhead of maintaining the tables and data distribution sanity as well as an increase in the query timings plus the data’s exponential growth led us to think of creating our own Data lake.

We process a significant amount of data to make sure our policies are in line with the current market conditions, RBI guidelines, and special situations like the coronavirus outbreak, the moratorium on interest, etc.

So, for our Data lake, we chose S3 + Athena, which is comparatively faster and cheaper in terms of storage and accessibility for analytical processes, querying TBs of data within seconds, and all you need to pay for is just the data scanned when queried, with zero cost for running and managing the infra as in case of DB engines

Architecture

Explanation

This architecture was proposed to achieve the following key results:

  1. Prepare a Data Lake, a source of data which can be used for analytical purposes, plus is easily accessible and queryable.
  2. Reduce the size of data which is not relevant for the realtime processing that is to be stored in MongoDB
  3. Reduce the overall cost being incurred on MongoDB for storing, indexing and accessing the data.
  4. To set up a process to Transform the data in a ready to digest form, so that information and facts can be readily available.
  5. To do real-time aggregation on the streaming data so that the backend APIs can get ready to consume data from RDS, reducing the backend API response timing, as a result.

Backend server has the role of dumping the data into Mongo as well as the S3.

AWS Kinesis, dumps the data in an S3 bucket which we use as an unprocessed datastore. The data is dumped periodically in near real-time. The data remains in its raw format as received from the devices. Apart from this, the AWS Kinesis Firehose uses AWS Lambda to transform the real-time stream data and dump it into AWS RDS for backend API consumption.

The data from the unprocessed data store goes through an ETL process via AWS EMR or AWS Glue so that the data can be processed and partitioned as per requirement for analysis. This also works on reducing the unwanted clutter and duplicity in the data by comparing the unprocessed data with the already created data lake.

The data in the MongoDB is kept with a TTL of 45/60 days as per the use case, which helps us in keeping a check on the growth in size over MongoDB

Data partitions

Our use cases mainly revolve around users UUID and the time at which the data was taken. Accordingly, we finalised the partitions for the different dataset as per the use-cases of the different collections, as of now.

Data storage

The data is converted in Parquet format with compression in snappy providing us at max 8 times size compression in S3 resulting in low storage cost and also low cost incurred for accessing data by size.

Besides, with the data being Parquet, the querying is much faster and easy due to the columnar nature of data. This logically gives you the power to query on any particular field as if it was a primary or a sort key in itself.

Outcomes

  • The data is converted in Parquet format with compression in snappy providing us at max 8 times size compression in S3 resulting in low storage cost and also low cost incurred for accessing data by size.
  • Moving things out of Mongo to our data lake,  resulted in 65-70% less data storage and transfer costs, thereby, helping us in reducing the instance size requirement as well.
  • Current data storage and transfer cost, and the data backup cost for mongo is reduced by 70-75%
  • Due to less amount of data being stored and less dependency of Analytics to be performed on MongoDB we do not require the use of heavy Atlas AWS instance configuration as well, which resulted in further infrastructure cost saving by ~50%.
  • Previously, it was required to download the data from Mongo and convert it into a consumable format for reporting or analysis. Data Lake has automated this process and reduced the time involved in doing so, resulting in faster analysis.
  • Complete removal of the internal Product API servers’ dependency on MongoDB data by transforming the incoming sync data in real-time, and making it available within AWS RDS to be directly used by APIs without any need of complex calculation being performed by the API servers.

Future Scope

  • Creating a highly scalable Data sync engine, a project which caters to the data sync and data lake use case, allowing the Slice Apps backend servers to cater to user requests in a better manner, without sync processes hogging up our current backend server infrastructure.

So, that was all about our thought process behind the Data Lake we incorporated. Here at slice, we are continually building, developing, optimizing our systems. So, stay tuned for more such technical scoops! 

Jarvis: Slack notification framework

Do you run crons regularly and are worried about the progress of your cron? Or are simply curious if they’ll run perfectly or not? Or maybe you want to store logs in your server and send some part of those logs or the entire log file to yourself, in case of an exception, and want to avoid seeing a long list of logs in Elastic search / Cloudwatch. Or are you looking for a convenient way to send stats (in form of jpeg or CSV) of your server to your team on slack? Either way, Jarvis is the perfect tool for you!

Be it sending notifications on your dedicated slack channel and tagging the entire channel / yourself or just sending a message with or without any attachment, Jarvis does the deed.

It is a dedicated tool that can be integrated with your server/cron that sends a slack notification to a given channel name according to your requirement. All you need to do is just add Jarvis to your channel, and call a simple lambda code with all the required parameters and you will receive your notifications.

How Jarvis helped to smoothen things out at slice!

We, as a data-team run several crons for migration and data updation in databases. So, quite naturally, we wanted to be kept informed if a cron execution was completed or there was any kind of exception in any of the crons. Hence, Jarvis was created for a smooth transmission of notifications.
In case of an exception, we wanted to inform everyone in the team immediately, so that the team could act upon it at their earliest. 

Finally, the ultimate requirement that motivated us to create Jarvis was also that we wanted to send an attachment in our notification like CSV or Log file.

So, in short, the main aim for the creation of Jarvis was –


“To create a centralized tool that can be used by different services and codebases to send a notification to slack for different actions with or without an attachment”

Architecture

The basic architecture of Jarvis is pretty simple. Here’s a run-through:

The whole codebase of Jarvis runs in an AWS lambda code. If any server/cron wants to send a notification, then the flow is as follows.

  1. If the user wants to send a notification without any attachment
    1. User has to call the lambda function with the required function and then a slack notification gets sent to the given channel name, provided that the Jarvis app is added to your slack channel.
  2. If the user wants to send a notification with any attachment
    1. User needs to upload the required document from their machine to a predefined S3 bucket (in preferred format i.e., key should be /service_name/ file
    2. Next, they need to call the lambda function with the required function and a slack notification will be sent to the given channel name, provided that the Jarvis app is added to your slack channel.

How to add Jarvis to your channel:

  1. Go to your channel.
  2. Click on add app

3. Select Jarvis

Without adding the Jarvis app to your channel, you cannot send a notification to that channel.

How to invoke lambda function with custom payload

Lambda can be invoked with custom payload with or without an attachment. Here’s how:

  • Without attachment

    To send a notification, you need to invoke the lambda with the given payload
{
    "heading": "Heading",
    "message": "Message",
    "slack_channel": "slice-lambda-notifications"
}

// Here heading is optional while message and slack_channel are compulsory parameters. 
slack_channel is the channel name where you want to send the notification.

  • With attachment

To send an attachment (CSV, jpeg) using Jarvis we need to follow the given steps:

  1. Your code or product should upload the attachment in S3, in bucket slack-notification-framework-attachments before triggering the notification.
  2. As a good practice please upload your file at
    s3://slack-notification-framework-attachments/<channel name>/<attachment file>.<file format>
  3. Invoke the lambda using the following payload
{
    "heading": "Heading",
    "message": "Message",
    "slack_channel": "slice-lambda-notifications",
    "attachment": {
        "bucket": "slack-notification-framework-attachments",
        "key": "testing/test_file.csv"
    }
}
// Here heading is optional while attachment, message, and slack_channel are compulsory.
slack_channel is the channel name where you want to send the notification

How to involve the whole channel/user in the slack notification

If you want to send a notification in which the whole channel needs to be tagged, just send a message with appended <!channel> to the message key in the payload

Example:

If your message key is:  “Failure in deleting file”

And you want to tag the whole channel, send the message key as: <!channel>  Failure in deleting file

If you want to send a notification in which a particular user is tagged, just send a message with appended <@UserId> to the message key in the payload

Example:

If your message key is:  “Failure in deleting file”

And you want to tag the person,  send the message key as: <@UNKDY50FH>  Failure in deleting file

How to invoke lambda from code

NodeJS

var AWS = require('aws-sdk');

// you shouldn't hardcode your keys in production! See http://docs.aws.amazon.com/AWSJavaScriptSDK/guide/node-configuring.html
AWS.config.update({accessKeyId: 'akid', secretAccessKey: 'secret'});

var lambda = new AWS.Lambda();
var params = {
  FunctionName: 'slice-slack-notifier', /* required */
  Payload: JSON.stringify({
      {
                "heading": "Heading",
                "message": "Message",
                "slack_channel": "slice-lambda-notifications",
                "attachment": {
                    "bucket": "slack-notification-framework-attachments",
                    "key": "testing/test_file.csv"
                }
}
    }) /*Payload as defined above*/
};
lambda.invoke(params, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response});


Python

import boto3
boto3_session = boto3.Session(region_name="ap-south-1")
lambda_client = boto3_session.client("lambda")
lambda_client.invoke(
    FunctionName='slice-slack-notifier',
    InvocationType='RequestResponse',
    Payload=json.dumps(
        {
            {
            "heading": "Heading",
            "message": "Message",
            "slack_channel": "slice-lambda-notifications",
            "attachment": {
                "bucket": "slack-notification-framework-attachments",
                "key": "testing/test_file.csv"
            }
            }
        }
    ).encode("utf-8")
)



Error in payload / Notification message:

In case, an error occurs in sending the notification because of the payload or message format, a notification with the required error message will be sent to slack channel  #jarvis-error-notifications

That was all about Jarvis and how it has simplified transmission of notifications into our slack channels with or without an attachment. Here at slice, we are constantly working towards creating new tools, every day, to streamline our workflow. So, stay tuned for more!

S3 to Redshift / RDS data propagator

The one thing that is almost always taken for granted by firms is data. Be it customer information, sensitive information from the industry, trade secrets exclusive to the company or even basic employee information — data should be like treasure and its protection the utmost priority. However, this is easier said than done. Data is of no use if it’s always hidden and padlocked from additions and modifications. So, what do you do in such a situation? You monitor and filter!

The RDS data propagator does exactly that, minus any extra workforce. It is an automated tool that creates required tables and loads data into them using the S3 file upload operations. This, in turn, performs insert and update operations on Redshift tables, using the best practices for creating/ inserting/ updating tables, for the analytics.

So, now you must be thinking, what problems exactly did this automation solve and how did this help slice in streamlining its data? Well, here’s how:

How RDS data propagator streamlined data handling at Slice 

Here at Slice, we regularly use data to analyse trends and assess risk factors. In general, we can categorise data-related users in 2 distinct segments. They are:

  1. The Producers – (Softwares, SDEs, ETL developers)  
  2. The Consumers – (Data Analysts, Risk Analysts)

So long, the bridge between the producers and consumers were the Database Administrators (DBAs).

But that’s old school now. In the current IT Startup culture, maintaining the above structure is not always possible due to cost crunches, fast-paced developments, small team sizes, etc. In such a scenario, eventually, both the producers (developers) and consumers (analysts) have to perform certain tasks exclusive to the DBA in order to solve their topical data needs. While doing so, people often fail to make proper decisions regarding the underlying schema of their tables. This could be due to lack of knowledge or even sometimes by not being able to pre-calculate the future needs of the data. The biggest downside? Well, this, in the long run, leads to bad query performance as the data size increases.

Our case, you see, was no different. Let me take you through our story for a better viewpoint!

Reading data from a given table

Creating a table

What Keys?? 

Primary Keys, Foreign Keys, Sort Keys, Distribution Keys

Solution???

Research

Ask for help

Insert into table/ Update table

The part where we read the data from the tables was fine. 

The catch, however, lies in dealing with creating tables or inserting/ updating data into those tables for Analytical use cases. The Analysts dreaded upon performing such operations. Also, from a data security perspective, permission to run DML queries can’t be provided to all. In such a scenario the DBA used to be the go-to person for their help. On the other hand, with a very small data team in the firm, it was not feasible to provide a better turnaround time, while these small but important tasks needed to be prioritized, deprioritizing other tasks at hand. With an increasing Analyst team size and various data stores being added, these types of requests became more and more frequent, and as the old saying in IT goes : 

“If you have to do it more than twice, AUTOMATE IT.”

Hence, this tool was built.

So now that you know why it is required, let’s look into the technicalities. 

Here are the tactical details!

Technology Stack

  • AWS S3
  • AWS Lambda
  • AWS Redshift
  • Slack (for notifications)

Architecture

Features

  • Creating Redshift tables involving a check that no table is created without sort and distribution keys.
  • Updating table schemas.
  • Inserting and updating data into tables
  • Success and Failure notifications
  • Scaled up performance through Lambda

Usage Guide

The tool makes use of S3 to process files and an AWS lambda function listens to all S3 object creation events. Based on certain rules specified in the sections below, the lambda takes action and runs required Redshift queries.

The S3 to Redshift data propagator follows a strict folder and file structure, as shown in the architecture diagram

  • <destination_name> : Refers to the type of DB being used like: redshift, postgres, etc
  • <database_name>: DB name (postgres) or Schema name (redshift) in which the table is present
  • <table_name>: Name of the respective table which will contain the data
  • schema:
    • create: contains a json file specifying the column names and datatypes along with the primary, foreign, sort and distribution keys.
    • update: contains a json file specifying new column(s) to be added to the existing table
  • input:
    • load: csv files to be inserted directly into the tables are to place in this folder
    • update: csv files to be used for upserting into the tables are placed here.
      • Note: update functionality can happen if the table has any primary key defined.
  • succeeded: csv files that are loaded/ uploaded into the tables successfully are moved to this folder
  • failed: csv files that fail to get loaded into the tables are moved to this folder.

Eg:-

Creating Redshift tables

Schema definition:

The schema of a table should be provided in the format displayed below:

{
"column_specification" : {
<column_name} : <redshift_data_type}
},
"primary_key" : [column1, column2, ….], # optional
"dist_key" : <column_name}, #required
"sort_key" : [column1, column2, ….] #required
}

Eg:

{
"column_specification": {
"id": "INTEGER",
"name": "VARCHAR",
"phone_no": "INTEGER",
"time_stamp": "VARCHAR",
"uuid": "VARCHAR"
},
"primary_key": [
"uuid"
],
"dist_key": "uuid",
"sort_key": [
"uuid"
]
}

A json file is to be made using the above specification given.

Path to load the file : s3://****-data-propagator/destination=<destination_name>/database=<database_name>/table_name=<table_name>/schema/create/<file_name>.json

Once the file is loaded into the appropriate path, the lambda triggers and fires a redshift command to create a table as per the schema details provided in the JSON file.

Updating table schemas

There are two types of operations supported under this:

  1. Adding column(s)
  2. Deleting column(s)

Adding columns:

Table columns to be added should be provided in the format displayed below:

{
"column_specification" : {
"<column_name>" : "<redshift_data_type>"
}, #required
"operation_type" : "add" #required
}

eg:

{
"column_specification": {
"city": "VARCHAR",
"state": "VARCHAR"
},
"operation_type": "add"
}

A json file is to be made using the above specification given.

Deleting column(s):

Table columns to be deleted should be provided in the format displayed below:

{
column_specification : [column1, column2, ….], #required
operation_type : "drop" #required
}

eg:

{
"column_specification": ["city", "state"],
"operation_type": "drop"
}

A json file is to be made using the above specification given.

Path to load the file : s3://*****-data-propagator/destination=<destination_name>/database=<database_name>/table_name=<table_name>/schema/update/<file_name>.json

Once the file is loaded into the appropriate path, the columns in the specified redshift table are added or removed (dropped).

Note: Redshift doesn’t support adding or dropping multiple columns using a single query. However, this feature adds the functionality to support adding and dropping multiple columns by providing the info in the JSON file as specified above.

 Inserting and updating data into tables

A CSV file containing the data to be uploaded into the redshift table should be uploaded into the S3 bucket.

Path to load the file for inserting : s3://*****-data-propagator/destination=<destination_name>/database=<database_name>/table_name=<table_name>/input/load/<file_name>.csv

Path to load the file for updating (upserting) : s3://*****-data-propagator/destination=<destination_name>/database=<database_name>/table_name=<table_name>/input/update/<file_name>.csv

Success and Failure notifications

The success and failure notification of any process started using this architecture is sent on the Slack channel, in our case we send it to #data-loader-noitification

Success Notification

Failure Notification

Other usage

Apart from dealing with manual file upload tasks, this tool is also used for managing data insertion and updation flow into Redshift by our ETL cron jobs. This is because, it follows the best practices for inserting and updating data in redshift and is easy to plug and play with the manual and automated data flow requirements

Future scope

  1. Allowing the feature to update required columns. Currently, the tool supports updating an entire row.
  2. Adding a feature to support different file types like JSON, Parquet.
  3. Adding more destinations to the list, like Postgres, MySQL, etc.