How to build a cost-effective and robust streaming data pipeline

26 February 2024
Blog Image

Envision a situation where you're tasked with managing clickstream data received via Snowplow. This was precisely the challenge faced by one of our clients, who manages substantial amounts of customer data through their platform. In this blog post, we'll guide you through their request and our solution step by step. It all begins with setting the scene...

Our client's data contains lots of valuable information about how customers use the online platform, which in turn allows for tailored communication and several other business insights. This kind of data is constantly being generated in real-time, necessitating the development of a solution capable of effectively managing it. Furthermore, there is no existing blueprint or similar solution at the client's disposal for reference. So we can start from scratch.

Nevertheless, there are two specific constraints to consider:

  1. The solution must be constructed solely using AWS services, and
  2. Python must be utilized as the coding language.

You are aware of AWS's "Amazon Managed Service for Apache Flink", a service that can effectively handle the ingestion and processing of such data. However, upon closer examination, you discover that the Python API for Apache Flink is currently limited to version 1.15, which as severe limitations compared to its native Java counterpart. Consequently, pursuing this promising option would require deviating from our two specified limitations. 


Our mission is clear: we have to find alternative AWS services to develop a solution that can process the streaming data in a different manner!

Let's start with the Ingestion

To begin, we must establish a method for capturing and batching the streaming events, allowing us to store the raw data in our data lake. This initial step, known as ingestion, can be accomplished using Data Firehose (formerly known Kinesis Data Firehose). This service handles the complexities of ingesting and buffering streaming data, allowing us to collect and batch the events based on data volume or time window size. It then creates compressed JSON files and deposits them in an S3 bucket. In our specific scenario, a new file is generated approximately every 5 minutes, with the file name indicating the date and time of its creation. Additionally, the automatic scaling capabilities of Data Firehose help prevent data loss or delays, providing peace of mind for future ingestion loads.

Time for Standardization

Now that we have successfully stored the raw data in our data lake, the next step is to determine the subsequent course of action. Fortunately, S3 offers a useful feature that enables us to trigger an event notification whenever a new file is added to the bucket. Leveraging this functionality, we can send an event message to an SQS queue before allowing a Lambda function to consume it. Each message contains essential information such as the bucket and key details, which the Lambda function utilizes to load and process the events within each JSON file.

In this solution, the initial Lambda function encompasses the standardization logic, which can be quite intricate depending on the nature of the ingested raw data. Our objective here is to ensure that the data is cleaned and conforms to the necessary standards for subsequent analysis. We transform the unstructured JSON into structured parquet files using several methods, like employing the Python Pandera library to validate each field against a schema, thereby ensuring the correctness of the data types.

The incorporation of SQS as an intermediary step in our solution ensures the processing of every data file and establishes a loosely coupled architecture. This approach enables an event-driven design while maintaining flexibility. In the event that the Lambda function encounters difficulties processing the events within a data file, the message is redirected to a dead-letter queue (DLQ). This action triggers a CloudWatch alarm, which subsequently sends an email notification via SNS to alert us of the failure. To address these divergent events, the standardization code within the Lambda function can be adjusted accordingly. The message is then redriven, returning it to the SQS queue for the Lambda function to resume processing. This iterative process continues until the event successfully passes through. By adopting this approach, we can ensure that no data events are lost during the process.

Moreover, thanks to the loosely coupled nature of the architecture, any data files that do not contain divergent events will be processed seamlessly, as if no failure had occurred. The resulting standardized parquet files are subsequently stored in a dedicated S3 folder referred to as the "Parquet queue." It is important to note that this is not an SQS queue but rather a temporary storage location for the standardized files before the concatenation step.

Wrap it up with Concatenation

To optimize the handling of numerous small files, a more efficient approach is to concatenate them on an hourly basis. This consolidation results in a single file per hour, which proves advantageous for querying with Athena. By reducing the number of files, we enhance the efficiency of our queries. An EventBridge trigger is set to activate a Step Functions State Machine containing a second Lambda function every 20 minutes past the hour. This function verifies if all standardized parquet files from the previous hour have been received, based on their file names. Once confirmed, it retrieves these files from the Parquet queue, combines them into a single hourly file, stores it in the designated partitioned S3 bucket, and removes the source files from the Parquet queue. Additionally, it updates the Athena table partition, enabling us to query the newly concatenated and standardized data within our data lake. Finally, we configured an SNS notification to alert us in the event of a State Machine failure, ensuring that we receive timely notifications when necessary.

In this setup, it is crucial to consider scenarios where a set of events arrives later due to divergence. For instance, if a standardized file arrives at the Parquet queue hours after the initial concatenation step has passed, the concatenation function will not overwrite the previously created hourly file. Instead, it will append the events to that file, resulting in a consolidated file as if the delayed events had arrived on time.

Conclusions after incorporating the solution

In summary, by leveraging AWS services and thinking outside the box, we have successfully designed a robust, event-driven, and loosely coupled solution to handle streaming events. Despite the challenges posed by the limitations of the Python API for Apache Flink, we explored alternative AWS services to process the data in a slightly different manner. Through the ingestion step, we captured and batched the streaming events using Data Firehose, storing the raw data in our data lake. The standardization process with Lambda ensured that the data was cleaned and conformed to the necessary standards for analysis, transforming unstructured JSON into structured parquet files. By incorporating SQS as an intermediary step, we established a fault-tolerant architecture that ensured the processing of every data file, while also allowing for flexibility in handling divergent events. The concatenation step optimized the handling of small files by consolidating them into hourly files, improving query efficiency with Athena. Through careful consideration of delayed events, we ensured that the concatenation function appended them to the appropriate files, maintaining data integrity.

Overview of the complete solution

From a financial point of view, this serverless setup is also cost-effective since you pay only for the actual usage of each service. Data Firehose costs are determined by the volume of data ingested, while Lambda charges are based on the number of invocations and compute time, all while removing the necessity for provisioning and managing dedicated servers. This pay-as-you-go model enables cost optimization based on actual data processing requirements.

As a closing remark, while we have utilized clickstream data as an illustrative input source, it is worth noting that this solution is adaptable to any form of streaming data that a company may wish to leverage for advanced data analysis and machine learning objectives.

If you are struggling with similar challenges, we are always curious to hear and ready to look for a solution!