Survey, $25 for your thoughts?
Support
This is the multi-page printable view of this section. Click here to print.
Survey, $25 for your thoughts?
Support
Prerequisites
For this workshop, you will need access to the following:
In this workshop, we will be taking signals coming from multiple edge nodes and loading them into Snowflake. We will utilize dynamic S3 location routing and ingesting the data into Snowflake using an external table and snowpipe. You will need access to your own Snowflake account to complete this workshop.
To accomplish this, we will
Make sure you have a AWS account
Once you have logged into your AWS account. Navigate to S3 service and click the Create bucket button.
Give your bucket a name in the Bucket name field. Remember bucket names are globally unique, so make it meaningful for your organization. Select the region where the bucket will reside. For best results, utilize the same region Snowflake (on AWS) is currently utilizing. This will reduce network transfer costs. You can take the default settings for the rest of the options. Click Create bucket button. Navigate to the IAM service. Navigate to the Users section. Click the Add users button. Here we are going to create a new user so that pipeline can write data to S3. Make sure to select the “Access Key - Programmatic Access” as you will need this when setting up the S3 destination in pipeline. Next you will want to create a new policy to attach to the user. This policy needs access to S3 Service, with the ability to PutObject. In the resources section, add the ARN of the S3 bucket created above. We want to give pipeline the minimal access required to access your AWS account. Next we want to attach the policy to the new user. You may need to click the refresh button on the policy list for newly created policy to show up. Continue through the process to create the user. The last screen will contain an Access Key ID and a Secret Access Key. Copy these keys as you will need later on when building pipeline. Repeat the above steps but this time we are creating a user so that Snowflake can read the data. The attached policy will need access to List the files in S3 as well are read access. Remember to copy the Access Keys as we will need them later on when setting up the Snowflake integration. Make sure you have a Mezmo account Once you have Pipeline enabled, go ahead and Create a new Pipeline. You will be prompted to name your Pipeline, call it what you will but we will go with This parts easy. Go to the pipeline you created previously and click Add Source From there, just select Now, we need to create an access key corresponding to the new Go ahead and click the A new key will be generated for you to use and is displayed on the source details, as well as the HTTP address to send the data to. Be sure to copy this Access Key somewhere safe for later reference as you will not be able to view it again once the source node is updated. Click You now have an endpoint defined that can receive any data. If you run into trouble here, please checkout out our comprehensive Mezmo Platform workshop to learn how to utilize the sytem in depth. Next we will add S3 as a destination. You will need some information from you AWS account here. Specifically, you will need the following you created during step 2 of the previous section. With access information in hand Your destination should look similar to the image below. Note: Make sure messages going to this S3 destination contain the dynamic field as part of the path. Any events that do not have the fields will not go to S3. Now let’s connect the Now, simply Deploy your Pipeline. First you will need the Docker to simulate. You can either use our DockerHub image or clone the GitHub repo. If using the repo, follow the instructions in README.mdto build with one command. To learn more about using docker, check out their brief overview. Next you need to configure the devices via environment variables. All you need is your Mezmo Pipeline Source Key ( In a terminal, run one of the following commands: See GitHub repo for steps to build the docker image. When running by default, you should see data like the following streaming through in that terminal. Now let’s verify data is flowing to S3. It may take a few minutes for new data to show up as data is There are multiple ways to integrate with snowflake once data flows into S3. In this section we are going to demonstrate two different ways to integrate the data. At this point we have data flowing to S3. Now we would like to create an external table that reads data directly from S3. This is a great option if you have a lot of data and need to only access a small portion of it. The below SQL will Finally, we want to select some data to verify our connection is working. You will need to substitute the event_date field with a date from your S3 bucket. Now we would like to create an snowpipe that reads data from S3 and loads it directly to a internal table. This option is great if you want to do near real time reporting as data is constantly loaded to the database. The below SQL will Now that we have the pipe created. We need to get the SQS queue information associated with the pipe, so that we can enable event notification on the s3 bucket, that will tell snowflake a new file exists when the pipeline creates it. Copy the SQS arn from the notification_channel field. With the SQS arn in hand, navigate back to your S3 bucket and add a new Event Notification. This can be found on the properties tab of the bucket. When creating the event give it an event name (we are using Make sure the simulation is running and data is flowing through as only new files will be loaded via Snowpipe. After a few minutes, issue a select statement against the snowpipe table. You should see data in the table like the following. We have now succesfully connected a fleet of simulated devices to a Mezmo Pipeline and had data dynamically routed to an S3 bucket. With data flowing, new files are created in S3 which are exposed by both an external table and ingested via snowpipe. We are able to now write SQL on top of the data as shown in the image below. To experience more of what pipeline can do for you, take a peak at our Mezmo Platform workshop to utilize Open Telemetry and find other ways you can take advantage of events holistically on the Mezmo Platform or feel free to explore our other workshops at Mezmo Pipeline Workshops.Step 2: Create AWS Mezmo Pipeline User
Step 3: Create AWS Snowflake User
3 - Create the Pipeline
Step 1: Create a new Pipeline
Snowflake-Integration
. After creation, you will be taken to the following blank canvasStep 2: Add the Source
HTTP
, give it a Title like Edge Devices
, set Decoding Method to JSON
and click Save.HTTP
source. Click on the HTTP
source to bring up the following panel.Create new key
button in the Access Key Management
section. Here you can give the new access key a name of Edge Device Key
and click the Create
button.Update
to save your changes.Step 3: Add the S3 Destination
AWS Access Key ID
AWS Secret Access Key
AWS Bucket name
AWS Region
AWS S3
Snowflake Bucket
Access Key ID
and Secret Access Key
Bucket
name (we will go with mezmo-use1-snowflake-demo
)device-sim/event_date=%F/event_name={{ .event }}/
. This prefix allows for dynamic location routing and will store data including date and event name coming from the event field. Example device-sim/event_date=2022-11-09/event_name=transaction/
text
for the Encoding
, with a compression of gzip
Region
where you created your bucket (in this example we use us-east-1
)save
buttonEdge Device
Source to the Snowflake Bucket
Destination. For this example, we are not going to use any processors.4 - Run Simulation
Run the Simulation
Get the Docker
Configure the Simulation
KEY
) and the number of devices to run (NUMBER_DEVICES
). Utilize the http authorization key created in Step 2: Add the Source. To do this on MacOS, simply run:export KEY=<http_secret_key>
export NUMBER_DEVICES=25
Run it
Mezmo’s Docker Hub
docker run -e KEY=${KEY} -e NUMBER_DEVICES=${NUMBER_DEVICES} -it mezmo/transaction-device-sim:0.1.0
Local after Building
docker run -e KEY=${KEY} -e NUMBER_DEVICES=${NUMBER_DEVICES} -it transaction-device-sim
Output
Verify data is flowing into S3
buffered
in the pipeline for a few seconds before writing to S3.5 - Create Snowflake Tables
Snowflake Integration
Create Snowflake External Table
USE DATABASE MEZMO_DEMO;
CREATE SCHEMA DEVICE_DEMO;
CREATE FILE FORMAT MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT TYPE=JSON COMPRESSION=GZIP;
CREATE STAGE MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo url='s3://<bucket>'
credentials=(aws_key_id='<key>' aws_secret_key='<secret>')
file_format=MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT;
CREATE OR REPLACE EXTERNAL TABLE MEZMO_DEMO.DEVICE_DEMO.EVENT_EXT
WITH location = @MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo/device-sim/
FILE_FORMAT=MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT;
CREATE OR REPLACE EXTERNAL TABLE MEZMO_DEMO.DEVICE_DEMO.EVENT_EXT (
EVENT_DATE DATE as to_date(split_part(split_part(METADATA$FILENAME, '/', 2), '=', 2)),
EVENT_NAME VARCHAR as split_part(split_part(METADATA$FILENAME, '/', 3), '=', 2)
)
PARTITION BY ( EVENT_DATE, EVENT_NAME )
WITH location = @MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo/device-sim/
FILE_FORMAT=MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT;
Select event_date,
VALUE:transaction.result::VARCHAR as transaction_result,
sum(VALUE:transaction.total_price)
from MEZMO_DEMO.DEVICE_DEMO.EVENT_EXT where event_date = '<date>' and event_name = 'transaction'
GROUP BY event_date, VALUE:transaction.result;
Create Snowpipe Internal Table
USE DATABASE MEZMO_DEMO;
CREATE SCHEMA DEVICE_DEMO;
CREATE FILE FORMAT MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT TYPE=JSON COMPRESSION=GZIP;
CREATE STAGE MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo url='s3://<bucket>'
credentials=(aws_key_id='<key>' aws_secret_key='<secret>')
file_format=MEZMO_DEMO.DEVICE_DEMO.JSON_FORMAT;
CREATE OR REPLACE TABLE MEZMO_DEMO.DEVICE_DEMO.EVENT_PIPE (
EVENT VARIANT,
EVENT_DATE DATE,
EVENT_NAME VARCHAR(64)
);
CREATE PIPE MEZMO_DEMO.DEVICE_DEMO.EVENT_PIPE AUTO_INGEST=TRUE AS
COPY INTO MEZMO_DEMO.DEVICE_DEMO.EVENT_PIPE
FROM (Select $1::VARIANT,
to_date(split_part(split_part(METADATA$FILENAME, '/', 2), '=', 2)),
split_part(split_part(METADATA$FILENAME, '/', 3), '=', 2)
FROM @MEZMO_DEMO.DEVICE_DEMO.mezmo_use1_snowflake_demo/device-sim/)
SHOW PIPES;
Snowpipe S3 Event
) and a Prefix (we are using device-sim/
). Be sure to select the All object create events
under event type. Under the Destinations section, select SQS queue
and Enter SQS queue ARN
. Provide the arn from the SHOW PIPES
command in snowflake.Select * from MEZMO_DEMO.DEVICE_DEMO.EVENT_PIPE;
6 - Next Steps
Recap
Next Steps