Streaming real-time data into Snowflake using Kinesis Streams
Gennadiy Gashev
Solutions architect
Table of contents
Significant growth of a product’s user base always leads to challenges for data engineering teams. The volume of events produced by millions (or billions) of users makes it almost impossible to use standard solutions for ingestion as is. It’s always nuanced and adjusted for particular situation.
The real-life example (with code) in this article shares our experience of building efficient real-time data streaming pipeline for the World project . The architecture is based on AWS Kinesis , and Snowflake — both are well acknowledge leaders in data processing services in the industry.
High-level architecture
We will create an AWS CDK stack that will stream data into a Snowflake table using Kinesis Streams and Kinesis Data Firehose. This solution will allow to ingest data from different accounts and regions into a Snowflake table in real-time.
Amazon Kinesis Data Streams is a scalable and durable real-time data streaming service that can continuously capture gigabytes of data per second from hundreds of sources. Snowflake is a cloud-based data warehousing platform that allows you to store and analyze large volumes of data.
There are two ways to load data into the tables in Snowflake: Snowpipe and Snowpipe Streaming . Snowpipe performs the loading from files in extremely micro batches. Essentially, streams are aggregated, written to interim storage, and then loaded into Snowflake. It is highly inefficient and leads to several minutes of latency, increasing costs.
We have now introduced Firehose to the approach in order to cost- and latency-optimize the process further, and make it less complex. Firehose with Snowpipe Streaming allows writing of individual rows of data directly into tables, making the records deliverable as soon as they become available; in turn, the data becomes queryable in Snowflake within seconds.
Snowflake setup
We will need a Snowflake user with permissions to insert data into the table. We will also need to configure key pair authentication for the user. Follow this guide , it should be relatively simple.
Next, create a secret in AWS Secrets Manager containing this private key. This secret will be used by the Kinesis Data Firehose delivery stream to authenticate with Snowflake.
aws secretsmanager create-secret \
--name Snowflake/PrivateKey \
--secret-string file://path/to/private-key-file
AWS setup
Let’s build a functional pipeline for streaming real-time data from Kinesis into Snowflake. We will need to setup a few resources, as well as IAM policies.
KMS Key
We will start by creating a new CMK (customer-managed key) KMS key for the Kinesis stream. This key will be used to encrypt the stream and share access with the data producers. After creation, we will add a resource policy to the key that allows access to the key metadata for our account and through Amazon Kinesis for all principals in the account authorized to use Amazon Kinesis.
In alternative way, you can use the default AWS-managed KMS key for encryption of the stream, but usage of the AWS managed key restricts access to the stream to resources in that account only.
const encryptionKey = new cdk.aws_kms.Key(this, 'Key', {})
encryptionKey.addToResourcePolicy(
new cdk.aws_iam.PolicyStatement({
sid: 'Allow direct access to key metadata to the account',
effect: cdk.aws_iam.Effect.ALLOW,
principals: [new cdk.aws_iam.AccountRootPrincipal()],
actions: ['kms:Describe*', 'kms:Get*', 'kms:List*'],
resources: ['*'],
}),
)
encryptionKey.addToResourcePolicy(
new cdk.aws_iam.PolicyStatement({
sid: 'Allow use of the key through Amazon Kinesis for principals in the account that are authorized to use Amazon Kinesis',
effect: cdk.aws_iam.Effect.ALLOW,
principals: [new cdk.aws_iam.AnyPrincipal()],
actions: ['kms:Encrypt', 'kms:Decrypt', 'kms:ReEncrypt*', 'kms:GenerateDataKey*', 'kms:DescribeKey'],
resources: ['*'],
conditions: {
StringEquals: {
'kms.ViaService': `kinesis.${cdk.Stack.of(this).region}.amazonaws.com`,
'kms:CallerAccount': cdk.Stack.of(this).account,
},
},
}),
)
Kinesis stream
The creation of the Kinesis stream is straightforward. We will create a new Kinesis stream with default parameters and use the previously created KMS key for encryption.
The tricky part is the setup of cross-account access to the stream for producers. When data producers and Kinesis stream are all in the same account, you can grant access simply using .grantWrite()
method in CDK. This does not work for cross-account producers, so you need to add resource-based policies to the stream.
Unfortunately, the AWS CDK does not provide a high-level construct for adding resource-based policies to the Kinesis stream. Because of it we must use AWS CLI or AWS SDK for it, but the more elegant way would be to write AWS custom resource in CDK. Custom resources allow you to call AWS SDK methods during the deployment of the stack.
const stream = new cdk.aws_kinesis.Stream(this, 'Stream', {encryptionKey})
new cdk.custom_resources.AwsCustomResource(this, 'Resource', {
onUpdate: {
service: 'Kinesis',
action: 'PutResourcePolicy',
parameters: {
ResourceARN: stream.streamArn,
Policy: JSON.stringify({
Version: '2012-10-17',
Statement: [
{
Sid: 'Allow cross-account write access to the stream',
Effect: 'Allow',
Principal: {AWS: 'arn:aws:iam::123456789012:role/data-producer-example'},
Action: ['kinesis:PutRecord', 'kinesis:PutRecords', 'kinesis:DescribeStreamSummary', 'kinesis:ListShards'],
Resource: stream.streamArn,
},
],
}),
},
physicalResourceId: cdk.custom_resources.PhysicalResourceId.of('Resource'),
},
policy: cdk.custom_resources.AwsCustomResourcePolicy.fromSdkCalls({resources: [stream.streamArn]}),
})
Kinesis Data Firehose
The next step is to create a Kinesis Data Firehose delivery stream that will deliver records from the Kinesis stream to the Snowflake table. Firehose requires a role to read from the Kinesis stream. We will create new role and grant it read access to the stream.
const role = new cdk.aws_iam.Role(this, 'Role', {
assumedBy: new cdk.aws_iam.ServicePrincipal('firehose.amazonaws.com', {
conditions: {StringEquals: {'sts:ExternalId': cdk.Stack.of(this).account}},
}),
})
Let’s now create the Firehose delivery stream. Its role is to consume data from Kinesis stream, and load it into the Snowflake table. You should set the account URL, database, schema, table, user, and the role of Snowflake. Additionally, you need to provide the private key from the secret and the ARN of the role.
const secret = cdk.aws_secretsmanager.Secret.fromSecretNameV2(this, 'Secret', 'Snowflake/PrivateKey')
const deliveryStream = new cdk.aws_kinesisfirehose.CfnDeliveryStream(this, 'DeliveryStream', {
deliveryStreamType: 'KinesisStreamAsSource',
kinesisStreamSourceConfiguration: {kinesisStreamArn: stream.streamArn, roleArn: role.roleArn},
snowflakeDestinationConfiguration: {
accountUrl: 'https://account.region.snowflakecomputing.com',
database: 'database',
schema: 'schema',
table: 'table',
user: 'username',
snowflakeRoleConfiguration: {snowflakeRole: 'role', enabled: true},
privateKey: secret.secretValue.toString(),
roleArn: role.roleArn,
},
})
After that code we can add the following lines to grant access to the Kinesis stream and secret before the delivery stream creation. Access to the Kinesis Stream should be granted to the role before creating the delivery stream with this role.
stream.grantReadWrite(role).applyBefore(deliveryStream)
Full code can be downloaded here .
Additional Improvements
This article outlines the necessary processes for live stream ingestion into Snowflake, but there are additional optimizations that you can look into:
- Data Transformation. You can consider the data transformation implementation before the ingest to cut down the processing time in Snowflake.
- S3 Backup Mode. The data durability can be improved in the case of anomalies by configuring
s3BackupMode
in Kinesis Firehose.- The
FailedDataOnly
mode that is utilized solely with the failed records in S3, is very helpful for the purpose of troubleshooting and also it cuts the storage costs. - The alternative is the
AllData
mode which stores all records (the ones that were successful and the other ones that failed) in the S3, thus this is a complete option for data audits and recovery.
- The
- Stream Monitoring. Monitoring the Kinesis stream on a regular basis is a key factor in maintaining the reliability of the data flow.
- Among the main metrics are,
IncomingRecords
andIncomingBytes
to track incoming data, PutRecords.FailedRecords
— that will indicate to you if there is a data delivery issue, andGetRecords.IteratorAgeMilliseconds
— which will give you the information regarding the data processing latency.ReadProvisionedThroughputExceeded
andWriteProvisionedThroughputExceeded
— which are triggered when limits of the stream throughput are exceeded,- Performance tracking and managing is done rather straightforwardly using
BytesPerSecondLimit
andRecordsPerSecondLimit
.
- Among the main metrics are,
Other articles
Gradual rollouts with AWS Lambda
Learn how to mitigate deployment risks using AWS Lambda's gradual rollout feature, enabling safer, incremental updates to your product's backend.
Using Tailwind to fill in the gaps in your team's CSS knowledge
Many engineering teams are favoring Tailwind CSS over plain CSS for its ease of styling web frontends with utility classes, addressing scalability issues encountered with traditional CSS as project size grows.