This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Dynamic S3 Snowflake Ingestion

In this workshop, we will be taking signals coming from multiple edge nodes and loading them into Snowflake. We will utilize dynamic S3 location routing and ingesting the data into Snowflake using an external table and snowpipe. You will need access to your own Snowflake account to complete this workshop.

1 - Getting Started

Overview

In this workshop, we will be taking signals coming from multiple edge nodes and loading them into Snowflake. We will utilize dynamic S3 location routing and ingesting the data into Snowflake using an external table and snowpipe. You will need access to your own Snowflake account to complete this workshop.

To accomplish this, we will

  • Create a new Pipeline
  • Configure an endpoint to receive the data (ie, a Source)
  • Send data to S3 Destination, utilizing dynamic pathing
  • Create external table in Snowflake accessing S3 directly
  • Create Snowpipe in Snowflake that automatically loads data into an internal table.

2 - Create AWS S3 Bucket and Snowflake User

Step 1: Create a new S3 Bucket

Once you have logged into your AWS account. Navigate to S3 service and click the Create bucket button.

S3 Service

Give your bucket a name in the Bucket name field. Remember bucket names are globally unique, so make it meaningful for your organization.

Select the region where the bucket will reside. For best results, utilize the same region Snowflake (on AWS) is currently utilizing. This will reduce network transfer costs.

You can take the default settings for the rest of the options. Click Create bucket button.

Create S3 Bucket

Step 2: Create AWS Mezmo Pipeline User

Navigate to the IAM service.

IAM Service

Navigate to the Users section. Click the Add users button.

IAM Add User

Here we are going to create a new user so that pipeline can write data to S3. Make sure to select the “Access Key - Programmatic Access” as you will need this when setting up the S3 destination in pipeline.

Pipeline User

Next you will want to create a new policy to attach to the user. This policy needs access to S3 Service, with the ability to PutObject. In the resources section, add the ARN of the S3 bucket created above. We want to give pipeline the minimal access required to access your AWS account.

Pipeline User Create Policy

Next we want to attach the policy to the new user. You may need to click the refresh button on the policy list for newly created policy to show up.

Pipeline User Attach Policy

Continue through the process to create the user. The last screen will contain an Access Key ID and a Secret Access Key. Copy these keys as you will need later on when building pipeline.

Pipeline User Access Keys

Step 3: Create AWS Snowflake User

Repeat the above steps but this time we are creating a user so that Snowflake can read the data.

The attached policy will need access to List the files in S3 as well are read access.

Pipeline User Policy

Remember to copy the Access Keys as we will need them later on when setting up the Snowflake integration.

3 - Create the Pipeline

Step 1: Create a new Pipeline

Once you have Pipeline enabled, go ahead and Create a new Pipeline. You will be prompted to name your Pipeline, call it what you will but we will go with Snowflake-Integration. After creation, you will be taken to the following blank canvas

Blank Pipeline

Step 2: Add the Source

This parts easy. Go to the pipeline you created previously and click Add Source

Add Source

From there, just select HTTP, give it a Title like Edge Devices, set Decoding Method to JSON and click Save.

Add Source

Now, we need to create an access key corresponding to the new HTTP source. Click on the HTTP source to bring up the following panel.

Added Source

Go ahead and click the Create new key button in the Access Key Management section. Here you can give the new access key a name of Edge Device Key and click the Create button.

Added Source

A new key will be generated for you to use and is displayed on the source details, as well as the HTTP address to send the data to. Be sure to copy this Access Key somewhere safe for later reference as you will not be able to view it again once the source node is updated. Click Update to save your changes.

Added Source

You now have an endpoint defined that can receive any data. If you run into trouble here, please checkout out our comprehensive Mezmo Platform workshop to learn how to utilize the sytem in depth.

Step 3: Add the S3 Destination

Next we will add S3 as a destination.

You will need some information from you AWS account here. Specifically, you will need the following you created during step 2 of the previous section.

  • AWS Access Key ID
  • AWS Secret Access Key
  • AWS Bucket name
  • AWS Region

With access information in hand

  • Add a new Destination and select AWS S3
  • Give this Destination the title Snowflake Bucket
  • Enter your Access Key ID and Secret Access Key
  • Enter Bucket name (we will go with mezmo-use1-snowflake-demo)
  • Enter the prefix of device-sim/event_date=%F/event_name={{ .event }}/. This prefix allows for dynamic location routing and will store data including date and event name coming from the event field. Example device-sim/event_date=2022-11-09/event_name=transaction/
  • Chose text for the Encoding, with a compression of gzip
  • Select the Region where you created your bucket (in this example we use us-east-1)
  • Click the save button

Your destination should look similar to the image below.

Note: Make sure messages going to this S3 destination contain the dynamic field as part of the path. Any events that do not have the fields will not go to S3.

S3 Destination Definition

Now let’s connect the Edge Device Source to the Snowflake Bucket Destination. For this example, we are not going to use any processors.

S3 Destination Connected

Now, simply Deploy your Pipeline.

4 - Run Simulation

Run the Simulation

Get the Docker

First you will need the Docker to simulate. You can either use our DockerHub image or clone the GitHub repo. If using the repo, follow the instructions in README.mdto build with one command. To learn more about using docker, check out their brief overview.

Configure the Simulation

Next you need to configure the devices via environment variables. All you need is your Mezmo Pipeline Source Key (KEY) and the number of devices to run (NUMBER_DEVICES). Utilize the http authorization key created in Step 2: Add the Source. To do this on MacOS, simply run:

export KEY=<http_secret_key>
export NUMBER_DEVICES=25

Run it

In a terminal, run one of the following commands:

Mezmo’s Docker Hub

docker run -e KEY=${KEY} -e NUMBER_DEVICES=${NUMBER_DEVICES} -it mezmo/transaction-device-sim:0.1.0

Local after Building

See GitHub repo for steps to build the docker image.

docker run -e KEY=${KEY} -e NUMBER_DEVICES=${NUMBER_DEVICES} -it transaction-device-sim

Output

When running by default, you should see data like the following streaming through in that terminal.

Device Simulation Ouput

Verify data is flowing into S3

Now let’s verify data is flowing to S3. It may take a few minutes for new data to show up as data is buffered in the pipeline for a few seconds before writing to S3.

S3 Data

5 - Create Snowflake Tables

Snowflake Integration

There are multiple ways to integrate with snowflake once data flows into S3. In this section we are going to demonstrate two different ways to integrate the data.

  1. Snowflake External Table - This method is useful if you do not want to pull all the data into Snowflake, but still want to access it and perform SQL over it.
  2. Snowpipe Internal Table - This method is useful if you want to continuously load data from S3 into an internal Snowflake table.

Create Snowflake External Table

At this point we have data flowing to S3. Now we would like to create an external table that reads data directly from S3. This is a great option if you have a lot of data and need to only access a small portion of it.

The below SQL will

  1. Create a new schema.
  2. Create a new file format. This file format indicates the data is JSON and compressed with GZIP.
  3. Create a new stage. This is a pointer to an S3 location with access credentials and the file format the data is. You will need to substitute the S3 bucket name we created previously and the Snowflake User AWS Access Key and Secret Key.
  4. Create a new external table. This external table utilizes partitioning, so it does not have to read all the data in when given a partition in the where clause of the select.
USE DATABASE MEZMO_DEMO;

CREATE SCHEMA DEVICE_DEMO;

CREATE FILE FORMAT MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT TYPE=JSON COMPRESSION=GZIP;

CREATE STAGE MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo url='s3://<bucket>'
credentials=(aws_key_id='<key>' aws_secret_key='<secret>')
file_format=MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT;

CREATE OR REPLACE EXTERNAL TABLE MEZMO_DEMO.DEVICE_DEMO.EVENT_EXT
WITH location =  @MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo/device-sim/
FILE_FORMAT=MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT;

CREATE OR REPLACE EXTERNAL TABLE MEZMO_DEMO.DEVICE_DEMO.EVENT_EXT (
EVENT_DATE DATE as to_date(split_part(split_part(METADATA$FILENAME, '/', 2), '=', 2)), 
EVENT_NAME VARCHAR as split_part(split_part(METADATA$FILENAME, '/', 3), '=', 2)
)
PARTITION BY ( EVENT_DATE, EVENT_NAME )
WITH location =  @MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo/device-sim/
FILE_FORMAT=MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT;

Finally, we want to select some data to verify our connection is working. You will need to substitute the event_date field with a date from your S3 bucket.

Select event_date, 
VALUE:transaction.result::VARCHAR as transaction_result, 
sum(VALUE:transaction.total_price)
from MEZMO_DEMO.DEVICE_DEMO.EVENT_EXT where event_date = '<date>' and event_name = 'transaction'
GROUP BY event_date, VALUE:transaction.result;

External Table Select

Create Snowpipe Internal Table

Now we would like to create an snowpipe that reads data from S3 and loads it directly to a internal table. This option is great if you want to do near real time reporting as data is constantly loaded to the database.

The below SQL will

  1. Create a new schema.
  2. Create a new file format. This file format indicates the data is JSON and compressed with GZIP.
  3. Create a new stage. This is a pointer to an S3 location with access credentials and the file format the data is. You will need to substitute the S3 bucket name we created previously and the Snowflake User AWS Access Key and Secret Key.
  4. Create a new pipe. This pipe will utilize a copy command that issues a select statement for each file that lands in S3.
USE DATABASE MEZMO_DEMO;

CREATE SCHEMA DEVICE_DEMO;

CREATE FILE FORMAT MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT TYPE=JSON COMPRESSION=GZIP;

CREATE STAGE MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo url='s3://<bucket>'
credentials=(aws_key_id='<key>' aws_secret_key='<secret>')
file_format=MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT;

CREATE OR REPLACE TABLE MEZMO_DEMO.DEVICE_DEMO.EVENT_PIPE ( 
  EVENT VARIANT,
  EVENT_DATE DATE,
  EVENT_NAME VARCHAR(64)
);

CREATE PIPE MEZMO_DEMO.DEVICE_DEMO.EVENT_PIPE AUTO_INGEST=TRUE AS
    COPY INTO MEZMO_DEMO.DEVICE_DEMO.EVENT_PIPE 
    FROM (Select $1::VARIANT, 
          to_date(split_part(split_part(METADATA$FILENAME, '/', 2), '=', 2)),
          split_part(split_part(METADATA$FILENAME, '/', 3), '=', 2)
          FROM @MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo/device-sim/)

Now that we have the pipe created. We need to get the SQS queue information associated with the pipe, so that we can enable event notification on the s3 bucket, that will tell snowflake a new file exists when the pipeline creates it.

SHOW PIPES;

Copy the SQS arn from the notification_channel field.

Show Pipes

With the SQS arn in hand, navigate back to your S3 bucket and add a new Event Notification. This can be found on the properties tab of the bucket.

When creating the event give it an event name (we are using Snowpipe S3 Event) and a Prefix (we are using device-sim/). Be sure to select the All object create events under event type. Under the Destinations section, select SQS queue and Enter SQS queue ARN. Provide the arn from the SHOW PIPES command in snowflake.

S3 Event

Make sure the simulation is running and data is flowing through as only new files will be loaded via Snowpipe.

After a few minutes, issue a select statement against the snowpipe table.

Select * from MEZMO_DEMO.DEVICE_DEMO.EVENT_PIPE;

You should see data in the table like the following.

Snowpipe Select

6 - Next Steps

Recap

We have now succesfully connected a fleet of simulated devices to a Mezmo Pipeline and had data dynamically routed to an S3 bucket. With data flowing, new files are created in S3 which are exposed by both an external table and ingested via snowpipe. We are able to now write SQL on top of the data as shown in the image below.

Final Pipeline

Next Steps

To experience more of what pipeline can do for you, take a peak at our Mezmo Platform workshop to utilize Open Telemetry and find other ways you can take advantage of events holistically on the Mezmo Platform or feel free to explore our other workshops at Mezmo Pipeline Workshops.