Snowflake Code Repository is a cloud-based repository provided by Snowflake, which allows users to manage and store SQL scripts and other code artifacts used for data integration, transformation, and analytics. It provides a centralized location for managing and sharing SQL scripts, views, and user-defined functions across different Snowflake accounts, regions, and organizations.
With the Snowflake Code Repository, users can easily collaborate on code development, version control, and code deployment workflows. It also enables users to manage dependencies between different code artifacts, track changes and revisions, and automate code testing and deployment.
The Snowflake Code Repository integrates seamlessly with Snowflake's cloud data platform and other Snowflake services, including Snowflake Data Marketplace, Snowflake Data Exchange, and Snowflake Partner Connect. It also provides access to various third-party tools and platforms, such as GitHub, Bitbucket, and Jenkins, to enhance the code management and deployment capabilities.
Getting Started >> Getting to Know the Interface
Create Database:
- Create a database from the share. create database snowflake_sample_data from share sfc_samples.sample_data;
- Grant the PUBLIC role access to the database.
- Optionally change the role name to restrict access to a subset of users. grant imported privileges on database snowflake_sample_data to role public;
Loading Data In Snowflake:
//Rename data base & creating the table + meta data
ALTER DATABASE FIRST_DB RENAME TO OUR_FIRST_DB;
CREATE TABLE "OUR_FIRST_DB"."PUBLIC"."LOAN_PAYMENT" (
"Loan_ID" STRING,
"loan_status" STRING,
"Principal" STRING,
"terms" STRING,
"effective_date" STRING,
"due_date" STRING,
"paid_off_time" STRING,
"past_due_days" STRING,
"age" STRING,
"education" STRING,
"Gender" STRING);
//Check that table is empy
USE DATABASE OUR_FIRST_DB;
SELECT * FROM LOAN_PAYMENT;
//Loading the data from S3 bucket
COPY INTO LOAN_PAYMENT
FROM s3://bucketsnowflakes3/Loan_payments_data.csv
file_format = (type = csv
field_delimiter = ','
skip_header=1);
//Validate
SELECT * FROM LOAN_PAYMENT;
Loading Data >> Create Stage
// Database to manage stage objects, fileformats etc.
CREATE OR REPLACE DATABASE MANAGE_DB;
CREATE OR REPLACE SCHEMA external_stages;
// Creating external stage
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.aws_stage
url='s3://bucketsnowflakes3'
credentials=(aws_key_id='ABCD_DUMMY_ID' aws_secret_key='1234abcd_key');
// Description of external stage
DESC STAGE MANAGE_DB.external_stages.aws_stage;
// Alter external stage
ALTER STAGE aws_stage
SET credentials=(aws_key_id='XYZ_DUMMY_ID' aws_secret_key='987xyz');
// Publicly accessible staging area
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.aws_stage
url='s3://bucketsnowflakes3';
// List files in stage
LIST @aws_stage;
//Load data using copy command
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS
FROM @aws_stage
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.';
COPY Command:
// Creating ORDERS table
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS (
ORDER_ID VARCHAR(30),
AMOUNT INT,
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS;
// First copy command
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS
FROM @aws_stage
file_format = (type = csv field_delimiter=',' skip_header=1);
// Copy command with fully qualified stage object
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS
FROM @MANAGE_DB.external_stages.aws_stage
file_format= (type = csv field_delimiter=',' skip_header=1);
// List files contained in stage
LIST @MANAGE_DB.external_stages.aws_stage;
// Copy command with specified file(s)
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS
FROM @MANAGE_DB.external_stages.aws_stage
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('OrderDetails.csv');
// Copy command with pattern for file names
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS
FROM @MANAGE_DB.external_stages.aws_stage
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.';
Transforming Data:
// Transforming using the SELECT statement
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM (select s.$1, s.$2 from @MANAGE_DB.external_stages.aws_stage s)
file_format= (type = csv field_delimiter=',' skip_header=1)
files=('OrderDetails.csv');
// Example 1 - Table
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
ORDER_ID VARCHAR(30),
AMOUNT INT
)
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
// Example 2 - Table
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
ORDER_ID VARCHAR(30),
AMOUNT INT,
PROFIT INT,
PROFITABLE_FLAG VARCHAR(30)
// Example 2 - Copy Command using a SQL function (subset of functions available)
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM (select
s.$1,
s.$2,
s.$3,
CASE WHEN CAST(s.$3 as int) < 0 THEN 'not profitable' ELSE 'profitable' END
from @MANAGE_DB.external_stages.aws_stage s)
file_format= (type = csv field_delimiter=',' skip_header=1)
files=('OrderDetails.csv');
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX
// Example 3 - Table
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
ORDER_ID VARCHAR(30),
AMOUNT INT,
PROFIT INT,
CATEGORY_SUBSTRING VARCHAR(5)
// Example 3 - Copy Command using a SQL function (subset of functions available)
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM (select
s.$1,
s.$2,
s.$3,
substring(s.$5,1,5)
from @MANAGE_DB.external_stages.aws_stage s)
file_format= (type = csv field_delimiter=',' skip_header=1)
files=('OrderDetails.csv');
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX
Copy options & ON_ERROR:
// Create new stage
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.aws_stage_errorex
url='s3://bucketsnowflakes4';
// List files in stage
LIST @MANAGE_DB.external_stages.aws_stage_errorex;
// Create example table
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
ORDER_ID VARCHAR(30),
AMOUNT INT,
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
// Demonstrating error message
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('OrderDetails_error.csv');
// Validating table is empty
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX ;
// Error handling using the ON_ERROR option
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('OrderDetails_error.csv')
ON_ERROR = 'CONTINUE';
// Validating results and truncating table
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
SELECT COUNT(*) FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
TRUNCATE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX;
// Error handling using the ON_ERROR option = ABORT_STATEMENT (default)
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
ON_ERROR = 'ABORT_STATEMENT';
// Validating results and truncating table
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
SELECT COUNT(*) FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
TRUNCATE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX;
// Error handling using the ON_ERROR option = SKIP_FILE
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
ON_ERROR = 'SKIP_FILE';
// Validating results and truncating table
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
SELECT COUNT(*) FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
TRUNCATE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX;
// Error handling using the ON_ERROR option = SKIP_FILE_
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
ON_ERROR = 'SKIP_FILE_2';
// Validating results and truncating table
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
SELECT COUNT(*) FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
TRUNCATE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX;
// Error handling using the ON_ERROR option = SKIP_FILE_
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
ON_ERROR = 'SKIP_FILE_3%';
SELECT * FROM OUR_FIRST_DB.PUBLIC.ORDERS_EX;
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
ORDER_ID VARCHAR(30),
AMOUNT INT,
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('OrderDetails_error.csv','OrderDetails_error2.csv')
ON_ERROR = SKIP_FILE_3
SIZE_LIMIT = 30;
File Format Object – File Format:
// Specifying file_format in Copy command
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format = (type = csv field_delimiter=',' skip_header=1)
files = ('OrderDetails_error.csv')
ON_ERROR = 'SKIP_FILE_3';
// Creating table
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.ORDERS_EX (
ORDER_ID VARCHAR(30),
AMOUNT INT,
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
// Creating schema to keep things organized
CREATE OR REPLACE SCHEMA MANAGE_DB.file_formats;
// Creating file format object
CREATE OR REPLACE file format MANAGE_DB.file_formats.my_file_format;
// See properties of file format object
DESC file format MANAGE_DB.file_formats.my_file_format;
// Using file format object in Copy command
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format= (FORMAT_NAME=MANAGE_DB.file_formats.my_file_format)
files = ('OrderDetails_error.csv')
ON_ERROR = 'SKIP_FILE_3';
// Altering file format object
ALTER file format MANAGE_DB.file_formats.my_file_format
SET SKIP_HEADER = 1;
// Defining properties on creation of file format object
CREATE OR REPLACE file format MANAGE_DB.file_formats.my_file_format
TYPE=JSON,
TIME_FORMAT=AUTO;
// See properties of file format object
DESC file format MANAGE_DB.file_formats.my_file_format;
// Using file format object in Copy command
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format= (FORMAT_NAME=MANAGE_DB.file_formats.my_file_format)
files = ('OrderDetails_error.csv')
ON_ERROR = 'SKIP_FILE_3';
// Altering the type of a file format is not possible
ALTER file format MANAGE_DB.file_formats.my_file_format
SET TYPE = CSV;
// Recreate file format (default = CSV)
CREATE OR REPLACE file format MANAGE_DB.file_formats.my_file_format
// See properties of file format object
DESC file format MANAGE_DB.file_formats.my_file_format;
// Truncate table
TRUNCATE table OUR_FIRST_DB.PUBLIC.ORDERS_EX;
// Overwriting properties of file format object
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS_EX
FROM @MANAGE_DB.external_stages.aws_stage_errorex
file_format = (FORMAT_NAME= MANAGE_DB.file_formats.my_file_format field_delimiter = ',' skip_header=1 )
files = ('OrderDetails_error.csv')
ON_ERROR = 'SKIP_FILE_3';
DESC STAGE MANAGE_DB.external_stages.aws_stage_errorex;
Copy Options >> VALIDATION_MODE:
---- VALIDATION_MODE ----
// Prepare database & table
CREATE OR REPLACE DATABASE COPY_DB;
CREATE OR REPLACE TABLE COPY_DB.PUBLIC.ORDERS (
ORDER_ID VARCHAR(30),
AMOUNT VARCHAR(30),
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
// Prepare stage object
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
url='s3://snowflakebucket-copyoption/size/';
LIST @COPY_DB.PUBLIC.aws_stage_copy;
//Load data using copy command
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
VALIDATION_MODE = RETURN_ERRORS;
SELECT * FROM ORDERS;
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
VALIDATION_MODE = RETURN_5_ROWS ;
--- Use files with errors ---
create or replace stage copy_db.public.aws_stage_copy
url ='s3://snowflakebucket-copyoption/returnfailed/';
list @copy_db.public.aws_stage_copy;
-- show all errors --
copy into copy_db.public.orders
from @copy_db.public.aws_stage_copy
file_format = (type=csv field_delimiter=',' skip_header=1)
pattern='.Order.'
validation_mode=return_errors;
-- validate first n rows --
copy into copy_db.public.orders
from @copy_db.public.aws_stage_copy
file_format = (type=csv field_delimiter=',' skip_header=1)
pattern='.error.'
validation_mode=return_1_rows;
Rejected Records:
---- Use files with errors ----
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
url='s3://snowflakebucket-copyoption/returnfailed/';
LIST @COPY_DB.PUBLIC.aws_stage_copy;
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
VALIDATION_MODE = RETURN_ERRORS;
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
VALIDATION_MODE = RETURN_1_rows;
-------------- Working with error results -----------
---- 1) Saving rejected files after VALIDATION_MODE ----
CREATE OR REPLACE TABLE COPY_DB.PUBLIC.ORDERS (
ORDER_ID VARCHAR(30),
AMOUNT VARCHAR(30),
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
VALIDATION_MODE = RETURN_ERRORS;
// Storing rejected /failed results in a table
CREATE OR REPLACE TABLE rejected AS
select rejected_record from table(result_scan(last_query_id()));
-- Adding additional records --
INSERT INTO rejected
select rejected_record from table(result_scan(last_query_id()));
SELECT * FROM rejected;
---- 2) Saving rejected files without VALIDATION_MODE ----
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
ON_ERROR=CONTINUE;
select * from table(validate(orders, job_id => '_last'));
---- 3) Working with rejected records ----
SELECT REJECTED_RECORD FROM rejected;
CREATE OR REPLACE TABLE rejected_values as
SELECT
SPLIT_PART(rejected_record,',',1) as ORDER_ID,
SPLIT_PART(rejected_record,',',2) as AMOUNT,
SPLIT_PART(rejected_record,',',3) as PROFIT,
SPLIT_PART(rejected_record,',',4) as QUATNTITY,
SPLIT_PART(rejected_record,',',5) as CATEGORY,
SPLIT_PART(rejected_record,',',6) as SUBCATEGORY
FROM rejected;
SELECT * FROM rejected_values;
SIZE_LIMIT:
---- SIZE_LIMIT ----
// Prepare database & table
CREATE OR REPLACE DATABASE COPY_DB;
CREATE OR REPLACE TABLE COPY_DB.PUBLIC.ORDERS (
ORDER_ID VARCHAR(30),
AMOUNT VARCHAR(30),
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
// Prepare stage object
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
url='s3://snowflakebucket-copyoption/size/';
// List files in stage
LIST @aws_stage_copy;
//Load data using copy command
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
SIZE_LIMIT=20000;
RETURN_FAILED_ONLY:
---- RETURN_FAILED_ONLY ----
CREATE OR REPLACE TABLE COPY_DB.PUBLIC.ORDERS (
ORDER_ID VARCHAR(30),
AMOUNT VARCHAR(30),
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
// Prepare stage object
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
url='s3://snowflakebucket-copyoption/returnfailed/';
LIST @COPY_DB.PUBLIC.aws_stage_copy;
//Load data using copy command
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
RETURN_FAILED_ONLY = TRUE;
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
ON_ERROR =CONTINUE
RETURN_FAILED_ONLY = TRUE;
// Default = FALSE
CREATE OR REPLACE TABLE COPY_DB.PUBLIC.ORDERS (
ORDER_ID VARCHAR(30),
AMOUNT VARCHAR(30),
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
ON_ERROR =CONTINUE;
TRUNCATECOLUMNS:
---- TRUNCATECOLUMNS ----
CREATE OR REPLACE TABLE COPY_DB.PUBLIC.ORDERS (
ORDER_ID VARCHAR(30),
AMOUNT VARCHAR(30),
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(10),
SUBCATEGORY VARCHAR(30));
// Prepare stage object
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
url='s3://snowflakebucket-copyoption/size/';
LIST @COPY_DB.PUBLIC.aws_stage_copy;
//Load data using copy command
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.';
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
TRUNCATECOLUMNS = true;
SELECT * FROM ORDERS;
FORCE:
---- FORCE ----
CREATE OR REPLACE TABLE COPY_DB.PUBLIC.ORDERS (
ORDER_ID VARCHAR(30),
AMOUNT VARCHAR(30),
PROFIT INT,
QUANTITY INT,
CATEGORY VARCHAR(30),
SUBCATEGORY VARCHAR(30));
// Prepare stage object
CREATE OR REPLACE STAGE COPY_DB.PUBLIC.aws_stage_copy
url='s3://snowflakebucket-copyoption/size/';
LIST @COPY_DB.PUBLIC.aws_stage_copy;
//Load data using copy command
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.';
// Not possible to load file that have been loaded and data has not been modified
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.';
SELECT * FROM ORDERS;
// Using the FORCE option
COPY INTO COPY_DB.PUBLIC.ORDERS
FROM @aws_stage_copy
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.Order.'
FORCE = TRUE;
Load History:
-- Query load history within a database --
USE COPY_DB;
SELECT * FROM information_schema.load_history;
-- Query load history gloabally from SNOWFLAKE database --
SELECT * FROM snowflake.account_usage.load_history;
// Filter on specific table & schema
SELECT * FROM snowflake.account_usage.load_history
where schema_name='PUBLIC' and
table_name='ORDERS';
// Filter on specific table & schema
SELECT * FROM snowflake.account_usage.load_history
where schema_name='PUBLIC' and
table_name='ORDERS' and
error_count > 0;
// Filter on specific table & schema
SELECT * FROM snowflake.account_usage.load_history
WHERE DATE(LAST_LOAD_TIME) <= DATEADD(days,-1,CURRENT_DATE);
Loading unstructured data >> Create Stage & Load raw (JSON+1)
// First step: Load Raw JSON
CREATE OR REPLACE stage MANAGE_DB.EXTERNAL_STAGES.JSONSTAGE
url='s3://bucketsnowflake-jsondemo';
CREATE OR REPLACE file format MANAGE_DB.FILE_FORMATS.JSONFORMAT
TYPE = JSON;
CREATE OR REPLACE table OUR_FIRST_DB.PUBLIC.JSON_RAW (
raw_file variant);
COPY INTO OUR_FIRST_DB.PUBLIC.JSON_RAW
FROM @MANAGE_DB.EXTERNAL_STAGES.JSONSTAGE
file_format= MANAGE_DB.FILE_FORMATS.JSONFORMAT
files = ('HR_data.json');
SELECT * FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
Parse & Analyze (JSON+2):
// Second step: Parse & Analyse Raw JSON
// Selecting attribute/column
SELECT RAW_FILE:city FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
SELECT $1:first_name FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
// Selecting attribute/column - formattted
SELECT RAW_FILE:first_name::string as first_name FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT RAW_FILE:id::int as id FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT
RAW_FILE:id::int as id,
RAW_FILE:first_name::STRING as first_name,
RAW_FILE:last_name::STRING as last_name,
RAW_FILE:gender::STRING as gender
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
// Handling nested data
SELECT RAW_FILE:job as job FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
Handling nested data (JSON+3):
// Handling nested data
SELECT RAW_FILE:job as job FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT
RAW_FILE:job.salary::INT as salary
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT
RAW_FILE:first_name::STRING as first_name,
RAW_FILE:job.salary::INT as salary,
RAW_FILE:job.title::STRING as title
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
// Handling arreys
SELECT
RAW_FILE:prev_company as prev_company
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT
RAW_FILE:prev_company[1]::STRING as prev_company
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT
ARRAY_SIZE(RAW_FILE:prev_company) as prev_company
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT
RAW_FILE:id::int as id,
RAW_FILE:first_name::STRING as first_name,
RAW_FILE:prev_company[0]::STRING as prev_company
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
UNION ALL
SELECT
RAW_FILE:id::int as id,
RAW_FILE:first_name::STRING as first_name,
RAW_FILE:prev_company[1]::STRING as prev_company
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
ORDER BY id
Dealing with hierarchy (JSON+4):
SELECT
RAW_FILE:spoken_languages as spoken_languages
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT * FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT
array_size(RAW_FILE:spoken_languages) as spoken_languages
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
SELECT
RAW_FILE:first_name::STRING as first_name,
array_size(RAW_FILE:spoken_languages) as spoken_languages
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
SELECT
RAW_FILE:spoken_languages[0] as First_language
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT
RAW_FILE:first_name::STRING as first_name,
RAW_FILE:spoken_languages[0] as First_language
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW;
SELECT
RAW_FILE:first_name::STRING as First_name,
RAW_FILE:spoken_languages[0].language::STRING as First_language,
RAW_FILE:spoken_languages[0].level::STRING as Level_spoken
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
SELECT
RAW_FILE:id::int as id,
RAW_FILE:first_name::STRING as First_name,
RAW_FILE:spoken_languages[0].language::STRING as First_language,
RAW_FILE:spoken_languages[0].level::STRING as Level_spoken
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
UNION ALL
SELECT
RAW_FILE:id::int as id,
RAW_FILE:first_name::STRING as First_name,
RAW_FILE:spoken_languages[1].language::STRING as First_language,
RAW_FILE:spoken_languages[1].level::STRING as Level_spoken
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
UNION ALL
SELECT
RAW_FILE:id::int as id,
RAW_FILE:first_name::STRING as First_name,
RAW_FILE:spoken_languages[2].language::STRING as First_language,
RAW_FILE:spoken_languages[2].level::STRING as Level_spoken
FROM OUR_FIRST_DB.PUBLIC.JSON_RAW
ORDER BY ID
select
RAW_FILE:first_name::STRING as First_name,
f.value:language::STRING as First_language,
f.value:level::STRING as Level_spoken
from OUR_FIRST_DB.PUBLIC.JSON_RAW, table(flatten(RAW_FILE:spoken_languages)) f;
Insert the final data (JSON+5):
// Option 1: CREATE TABLE AS
CREATE OR REPLACE TABLE Languages AS
select
RAW_FILE:first_name::STRING as First_name,
f.value:language::STRING as First_language,
f.value:level::STRING as Level_spoken
from OUR_FIRST_DB.PUBLIC.JSON_RAW, table(flatten(RAW_FILE:spoken_languages)) f;
SELECT * FROM Languages;
truncate table languages;
// Option 2: INSERT INTO
INSERT INTO Languages
select
RAW_FILE:first_name::STRING as First_name,
f.value:language::STRING as First_language,
f.value:level::STRING as Level_spoken
from OUR_FIRST_DB.PUBLIC.JSON_RAW, table(flatten(RAW_FILE:spoken_languages)) f;
SELECT * FROM Languages;
Parquet data 1:
// Create file format and stage object
CREATE OR REPLACE FILE FORMAT MANAGE_DB.FILE_FORMATS.PARQUET_FORMAT
TYPE = 'parquet';
CREATE OR REPLACE STAGE MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE
url = 's3://snowflakeparquetdemo'
FILE_FORMAT = MANAGE_DB.FILE_FORMATS.PARQUET_FORMAT;
// Preview the data
LIST @MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE;
SELECT * FROM @MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE;
// File format in Queries
CREATE OR REPLACE STAGE MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE
url = 's3://snowflakeparquetdemo'
SELECT *
FROM @MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE
(file_format => 'MANAGE_DB.FILE_FORMATS.PARQUET_FORMAT')
// Quotes can be omitted in case of the current namespace
USE MANAGE_DB.FILE_FORMATS;
SELECT *
FROM @MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE
(file_format => MANAGE_DB.FILE_FORMATS.PARQUET_FORMAT)
CREATE OR REPLACE STAGE MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE
url = 's3://snowflakeparquetdemo'
FILE_FORMAT = MANAGE_DB.FILE_FORMATS.PARQUET_FORMAT;
// Syntax for Querying unstructured data
SELECT
$1:index_level_0,
$1:cat_id,
$1:date,
$1:"index_level_0",
$1:"cat_id",
$1:"d",
$1:"date",
$1:"dept_id",
$1:"id",
$1:"item_id",
$1:"state_id",
$1:"store_id",
$1:"value"
FROM @MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE;
// Date conversion
SELECT 1;
SELECT DATE(365*60*60*24);
// Querying with conversions and aliases
SELECT
$1:index_level_0::int as index_level,
$1:cat_id::VARCHAR(50) as category,
DATE($1:date::int ) as Date,
$1:"dept_id"::VARCHAR(50) as Dept_ID,
$1:"id"::VARCHAR(50) as ID,
$1:"item_id"::VARCHAR(50) as Item_ID,
$1:"state_id"::VARCHAR(50) as State_ID,
$1:"store_id"::VARCHAR(50) as Store_ID,
$1:"value"::int as value
FROM @MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE;
Parquet data 2:
// Adding metadata
SELECT
$1:index_level_0::int as index_level,
$1:cat_id::VARCHAR(50) as category,
DATE($1:date::int ) as Date,
$1:"dept_id"::VARCHAR(50) as Dept_ID,
$1:"id"::VARCHAR(50) as ID,
$1:"item_id"::VARCHAR(50) as Item_ID,
$1:"state_id"::VARCHAR(50) as State_ID,
$1:"store_id"::VARCHAR(50) as Store_ID,
$1:"value"::int as value,
METADATA$FILENAME as FILENAME,
METADATA$FILE_ROW_NUMBER as ROWNUMBER,
TO_TIMESTAMP_NTZ(current_timestamp) as LOAD_DATE
FROM @MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE;
SELECT TO_TIMESTAMP_NTZ(current_timestamp)
// Create destination table
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.PARQUET_DATA (
ROW_NUMBER int,
index_level int,
cat_id VARCHAR(50),
date date,
dept_id VARCHAR(50),
id VARCHAR(50),
item_id VARCHAR(50),
state_id VARCHAR(50),
store_id VARCHAR(50),
value int,
Load_date timestamp default TO_TIMESTAMP_NTZ(current_timestamp))
// Load the parquet data
COPY INTO OUR_FIRST_DB.PUBLIC.PARQUET_DATA
FROM (SELECT
METADATA$FILE_ROW_NUMBER,
$1:index_level_0::int,
$1:cat_id::VARCHAR(50),
DATE($1:date::int ),
$1:"dept_id"::VARCHAR(50),
$1:"id"::VARCHAR(50),
$1:"item_id"::VARCHAR(50),
$1:"state_id"::VARCHAR(50),
$1:"store_id"::VARCHAR(50),
$1:"value"::int,
TO_TIMESTAMP_NTZ(current_timestamp)
FROM @MANAGE_DB.EXTERNAL_STAGES.PARQUETSTAGE);
SELECT * FROM OUR_FIRST_DB.PUBLIC.PARQUET_DATA;
Performing optimization >> Dedicated VW:
// Create virtual warehouse for data scientist & DBA
// Data Scientists
CREATE WAREHOUSE DS_WH
WITH WAREHOUSE_SIZE = 'SMALL'
WAREHOUSE_TYPE = 'STANDARD'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 1
SCALING_POLICY = 'STANDARD';
// DBA
CREATE WAREHOUSE DBA_WH
WITH WAREHOUSE_SIZE = 'XSMALL'
WAREHOUSE_TYPE = 'STANDARD'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 1
SCALING_POLICY = 'STANDARD';
// Create role for Data Scientists & DBAs
CREATE ROLE DATA_SCIENTIST;
GRANT USAGE ON WAREHOUSE DS_WH TO ROLE DATA_SCIENTIST;
CREATE ROLE DBA;
GRANT USAGE ON WAREHOUSE DBA_WH TO ROLE DBA;
// Setting up users with roles
// Data Scientists
CREATE USER DS1 PASSWORD = 'DS1' LOGIN_NAME = 'DS1' DEFAULT_ROLE='DATA_SCIENTIST' DEFAULT_WAREHOUSE = 'DS_WH' MUST_CHANGE_PASSWORD = FALSE;
CREATE USER DS2 PASSWORD = 'DS2' LOGIN_NAME = 'DS2' DEFAULT_ROLE='DATA_SCIENTIST' DEFAULT_WAREHOUSE = 'DS_WH' MUST_CHANGE_PASSWORD = FALSE;
CREATE USER DS3 PASSWORD = 'DS3' LOGIN_NAME = 'DS3' DEFAULT_ROLE='DATA_SCIENTIST' DEFAULT_WAREHOUSE = 'DS_WH' MUST_CHANGE_PASSWORD = FALSE;
GRANT ROLE DATA_SCIENTIST TO USER DS1;
GRANT ROLE DATA_SCIENTIST TO USER DS2;
GRANT ROLE DATA_SCIENTIST TO USER DS3;
// DBAs
CREATE USER DBA1 PASSWORD = 'DBA1' LOGIN_NAME = 'DBA1' DEFAULT_ROLE='DBA' DEFAULT_WAREHOUSE = 'DBA_WH' MUST_CHANGE_PASSWORD = FALSE;
CREATE USER DBA2 PASSWORD = 'DBA2' LOGIN_NAME = 'DBA2' DEFAULT_ROLE='DBA' DEFAULT_WAREHOUSE = 'DBA_WH' MUST_CHANGE_PASSWORD = FALSE;
GRANT ROLE DBA TO USER DBA1;
GRANT ROLE DBA TO USER DBA2;
// Drop objects again
DROP USER DBA1;
DROP USER DBA2;
DROP USER DS1;
DROP USER DS2;
DROP USER DS3;
DROP ROLE DATA_SCIENTIST;
DROP ROLE DBA;
DROP WAREHOUSE DS_WH;
DROP WAREHOUSE DBA_WH;
Scaling Out:
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL.WEB_SITE T1
CROSS JOIN SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL.WEB_SITE T2
CROSS JOIN SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL.WEB_SITE T3
CROSS JOIN (SELECT TOP 57 * FROM SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL.WEB_SITE) T4
Caching:
SELECT AVG(C_BIRTH_YEAR) FROM SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL.CUSTOMER
// Setting up an additional user
CREATE ROLE DATA_SCIENTIST;
GRANT USAGE ON WAREHOUSE COMPUTE_WH TO ROLE DATA_SCIENTIST;
CREATE USER DS1 PASSWORD = 'DS1' LOGIN_NAME = 'DS1' DEFAULT_ROLE='DATA_SCIENTIST' DEFAULT_WAREHOUSE = 'DS_WH' MUST_CHANGE_PASSWORD = FALSE;
GRANT ROLE DATA_SCIENTIST TO USER DS1;
Clustering:
// Publicly accessible staging area
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.aws_stage
url='s3://bucketsnowflakes3';
// List files in stage
LIST @MANAGE_DB.external_stages.aws_stage;
//Load data using copy command
COPY INTO OUR_FIRST_DB.PUBLIC.ORDERS
FROM @MANAGE_DB.external_stages.aws_stage
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.OrderDetails.';
// Create table
CREATE OR REPLACE TABLE ORDERS_CACHING (
ORDER_ID VARCHAR(30)
,AMOUNT NUMBER(38,0)
,PROFIT NUMBER(38,0)
,QUANTITY NUMBER(38,0)
,CATEGORY VARCHAR(30)
,SUBCATEGORY VARCHAR(30)
,DATE DATE)
INSERT INTO ORDERS_CACHING
SELECT
t1.ORDER_ID
,t1.AMOUNT
,t1.PROFIT
,t1.QUANTITY
,t1.CATEGORY
,t1.SUBCATEGORY
,DATE(UNIFORM(1500000000,1700000000,(RANDOM())))
FROM ORDERS t1
CROSS JOIN (SELECT * FROM ORDERS) t2
CROSS JOIN (SELECT TOP 100 * FROM ORDERS) t3
// Query Performance before Cluster Key
SELECT * FROM ORDERS_CACHING WHERE DATE = '2020-06-09'
// Adding Cluster Key & Compare the result
ALTER TABLE ORDERS_CACHING CLUSTER BY ( DATE )
SELECT * FROM ORDERS_CACHING WHERE DATE = '2020-01-05'
// Not ideal clustering & adding a different Cluster Key using function
SELECT * FROM ORDERS_CACHING WHERE MONTH(DATE)=11
ALTER TABLE ORDERS_CACHING CLUSTER BY ( MONTH(DATE) )
Loading from AWS >> Create Storage integration:
// Create storage integration object
create or replace storage integration s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = ''
STORAGE_ALLOWED_LOCATIONS = ('s3:////', 's3:////')
COMMENT = 'This an optional comment'
// See storage integration properties to fetch external_id so we can update it in S3
DESC integration s3_int;
Load data from S3:
// Create table first
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.movie_titles (
show_id STRING,
type STRING,
title STRING,
director STRING,
cast STRING,
country STRING,
date_added STRING,
release_year STRING,
rating STRING,
duration STRING,
listed_in STRING,
description STRING )
// Create file format object
CREATE OR REPLACE file format MANAGE_DB.file_formats.csv_fileformat
type = csv
field_delimiter = ','
skip_header = 1
null_if = ('NULL','null')
empty_field_as_null = TRUE;
// Create stage object with integration object & file format object
CREATE OR REPLACE stage MANAGE_DB.external_stages.csv_folder
URL = 's3:////'
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = MANAGE_DB.file_formats.csv_fileformat
// Use Copy command
COPY INTO OUR_FIRST_DB.PUBLIC.movie_titles
FROM @MANAGE_DB.external_stages.csv_folder
// Create file format object
CREATE OR REPLACE file format MANAGE_DB.file_formats.csv_fileformat
type = csv
field_delimiter = ','
skip_header = 1
null_if = ('NULL','null')
empty_field_as_null = TRUE
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
SELECT * FROM OUR_FIRST_DB.PUBLIC.movie_titles
Handling JSON:
// Taming the JSON file
// First query from S3 Bucket
SELECT * FROM @MANAGE_DB.external_stages.json_folder
// Introduce columns
SELECT
$1:asin,
$1:helpful,
$1:overall,
$1:reviewText,
$1:reviewTime,
$1:reviewerID,
$1:reviewTime,
$1:reviewerName,
$1:summary,
$1:unixReviewTime
FROM @MANAGE_DB.external_stages.json_folder
// Format columns & use DATE function
SELECT
$1:asin::STRING as ASIN,
$1:helpful as helpful,
$1:overall as overall,
$1:reviewText::STRING as reviewtext,
$1:reviewTime::STRING,
$1:reviewerID::STRING,
$1:reviewTime::STRING,
$1:reviewerName::STRING,
$1:summary::STRING,
DATE($1:unixReviewTime::int) as Revewtime
FROM @MANAGE_DB.external_stages.json_folder
// Format columns & handle custom date
SELECT
$1:asin::STRING as ASIN,
$1:helpful as helpful,
$1:overall as overall,
$1:reviewText::STRING as reviewtext,
DATE_FROM_PARTS( , , )
$1:reviewTime::STRING,
$1:reviewerID::STRING,
$1:reviewTime::STRING,
$1:reviewerName::STRING,
$1:summary::STRING,
DATE($1:unixReviewTime::int) as Revewtime
FROM @MANAGE_DB.external_stages.json_folder
// Use DATE_FROM_PARTS and see another difficulty
SELECT
$1:asin::STRING as ASIN,
$1:helpful as helpful,
$1:overall as overall,
$1:reviewText::STRING as reviewtext,
DATE_FROM_PARTS( RIGHT($1:reviewTime::STRING,4), LEFT($1:reviewTime::STRING,2), SUBSTRING($1:reviewTime::STRING,4,2) ),
$1:reviewerID::STRING,
$1:reviewTime::STRING,
$1:reviewerName::STRING,
$1:summary::STRING,
DATE($1:unixReviewTime::int) as unixRevewtime
FROM @MANAGE_DB.external_stages.json_folder
// Use DATE_FROM_PARTS and handle the case difficulty
SELECT
$1:asin::STRING as ASIN,
$1:helpful as helpful,
$1:overall as overall,
$1:reviewText::STRING as reviewtext,
DATE_FROM_PARTS(
RIGHT($1:reviewTime::STRING,4),
LEFT($1:reviewTime::STRING,2),
CASE WHEN SUBSTRING($1:reviewTime::STRING,5,1)=','
THEN SUBSTRING($1:reviewTime::STRING,4,1) ELSE SUBSTRING($1:reviewTime::STRING,4,2) END),
$1:reviewerID::STRING,
$1:reviewTime::STRING,
$1:reviewerName::STRING,
$1:summary::STRING,
DATE($1:unixReviewTime::int) as UnixRevewtime
FROM @MANAGE_DB.external_stages.json_folder
// Create destination table
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.reviews (
asin STRING,
helpful STRING,
overall STRING,
reviewtext STRING,
reviewtime DATE,
reviewerid STRING,
reviewername STRING,
summary STRING,
unixreviewtime DATE
)
// Copy transformed data into destination table
COPY INTO OUR_FIRST_DB.PUBLIC.reviews
FROM (SELECT
$1:asin::STRING as ASIN,
$1:helpful as helpful,
$1:overall as overall,
$1:reviewText::STRING as reviewtext,
DATE_FROM_PARTS(
RIGHT($1:reviewTime::STRING,4),
LEFT($1:reviewTime::STRING,2),
CASE WHEN SUBSTRING($1:reviewTime::STRING,5,1)=','
THEN SUBSTRING($1:reviewTime::STRING,4,1) ELSE SUBSTRING($1:reviewTime::STRING,4,2) END),
$1:reviewerID::STRING,
$1:reviewerName::STRING,
$1:summary::STRING,
DATE($1:unixReviewTime::int) Revewtime
FROM @MANAGE_DB.external_stages.json_folder)
// Validate results
SELECT * FROM OUR_FIRST_DB.PUBLIC.reviews
Loading from Azure >> Create Integration:
USE DATABASE DEMO_DB;
-- create integration object that contains the access information
CREATE STORAGE INTEGRATION azure_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = AZURE
ENABLED = TRUE
AZURE_TENANT_ID = '9ecede0b-0e07-4da4-8047-e0672d6e403e'
STORAGE_ALLOWED_LOCATIONS = ('azure://storageaccountsnow.blob.core.windows.net/snowflakecsv', 'azure://storageaccountsnow.blob.core.windows.net/snowflakejson');
-- Describe integration object to provide access
DESC STORAGE integration azure_integration;
Create Stage:
---- Create file format & stage objects ----
-- create file format
create or replace file format demo_db.public.fileformat_azure
TYPE = CSV
FIELD_DELIMITER = ','
SKIP_HEADER = 1;
-- create stage object
create or replace stage demo_db.public.stage_azure
STORAGE_INTEGRATION = azure_integration
URL = 'azure://storageaccountsnow.blob.core.windows.net/snowflakecsv'
FILE_FORMAT = fileformat_azure;
-- list files
LIST @demo_db.public.stage_azure;
Load CSV:
---- Query files & Load data ----
--query files
SELECT
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12,
$13,
$14,
$15,
$16,
$17,
$18,
$19,
$20
FROM @demo_db.public.stage_azure;
create or replace table happiness (
country_name varchar,
regional_indicator varchar,
ladder_score number(4,3),
standard_error number(4,3),
upperwhisker number(4,3),
lowerwhisker number(4,3),
logged_gdp number(5,3),
social_support number(4,3),
healthy_life_expectancy number(5,3),
freedom_to_make_life_choices number(4,3),
generosity number(4,3),
perceptions_of_corruption number(4,3),
ladder_score_in_dystopia number(4,3),
explained_by_log_gpd_per_capita number(4,3),
explained_by_social_support number(4,3),
explained_by_healthy_life_expectancy number(4,3),
explained_by_freedom_to_make_life_choices number(4,3),
explained_by_generosity number(4,3),
explained_by_perceptions_of_corruption number(4,3),
dystopia_residual number (4,3));
COPY INTO HAPPINESS
FROM @demo_db.public.stage_azure;
SELECT * FROM HAPPINESS;
Load JSON:
--- Load JSON ----
create or replace file format demo_db.public.fileformat_azure_json
TYPE = JSON;
create or replace stage demo_db.public.stage_azure
STORAGE_INTEGRATION = azure_integration
URL = 'azure://storageaccountsnow.blob.core.windows.net/snowflakejson'
FILE_FORMAT = fileformat_azure_json;
LIST @demo_db.public.stage_azure;
-- Query from stage
SELECT * FROM @demo_db.public.stage_azure;
-- Query one attribute/column
SELECT $1:"Car Model" FROM @demo_db.public.stage_azure;
-- Convert data type
SELECT $1:"Car Model"::STRING FROM @demo_db.public.stage_azure;
-- Query all attributes
SELECT
$1:"Car Model"::STRING,
$1:"Car Model Year"::INT,
$1:"car make"::STRING,
$1:"first_name"::STRING,
$1:"last_name"::STRING
FROM @demo_db.public.stage_azure;
-- Query all attributes and use aliases
SELECT
$1:"Car Model"::STRING as car_model,
$1:"Car Model Year"::INT as car_model_year,
$1:"car make"::STRING as "car make",
$1:"first_name"::STRING as first_name,
$1:"last_name"::STRING as last_name
FROM @demo_db.public.stage_azure;
Create or replace table car_owner (
car_model varchar,
car_model_year int,
car_make varchar,
first_name varchar,
last_name varchar)
COPY INTO car_owner
FROM
(SELECT
$1:"Car Model"::STRING as car_model,
$1:"Car Model Year"::INT as car_model_year,
$1:"car make"::STRING as "car make",
$1:"first_name"::STRING as first_name,
$1:"last_name"::STRING as last_name
FROM @demo_db.public.stage_azure);
SELECT * FROM CAR_OWNER;
-- Alternative: Using a raw file table step
truncate table car_owner;
select * from car_owner;
create or replace table car_owner_raw (
raw variant);
COPY INTO car_owner_raw
FROM @demo_db.public.stage_azure;
SELECT * FROM car_owner_raw;
INSERT INTO car_owner
(SELECT
$1:"Car Model"::STRING as car_model,
$1:"Car Model Year"::INT as car_model_year,
$1:"car make"::STRING as car_make,
$1:"first_name"::STRING as first_name,
$1:"last_name"::STRING as last_name
FROM car_owner_raw)
select * from car_owner;
Loading from GCP >> Create Integration Object:
-- create integration object that contains the access information
CREATE STORAGE INTEGRATION gcp_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/path', 'gcs://bucket/path2');
-- Describe integration object to provide access
DESC STORAGE integration gcp_integration;
Create Stage:
-- create file format
create or replace file format demo_db.public.fileformat_gcp
TYPE = CSV
FIELD_DELIMITER = ','
SKIP_HEADER = 1;
-- create stage object
create or replace stage demo_db.public.stage_gcp
STORAGE_INTEGRATION = gcp_integration
URL = 'gcs://snowflakebucketgcp'
FILE_FORMAT = fileformat_gcp;
LIST @demo_db.public.stage_gcp;
Query & Load Data:
---- Query files & Load data ----
--query files
SELECT
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,
$12,$13,$14,$15,$16,$17,$18,$19,$20
FROM @demo_db.public.stage_gcp;
create or replace table happiness (
country_name varchar,
regional_indicator varchar,
ladder_score number(4,3),
standard_error number(4,3),
upperwhisker number(4,3),
lowerwhisker number(4,3),
logged_gdp number(5,3),
social_support number(4,3),
healthy_life_expectancy number(5,3),
freedom_to_make_life_choices number(4,3),
generosity number(4,3),
perceptions_of_corruption number(4,3),
ladder_score_in_dystopia number(4,3),
explained_by_log_gpd_per_capita number(4,3),
explained_by_social_support number(4,3),
explained_by_healthy_life_expectancy number(4,3),
explained_by_freedom_to_make_life_choices number(4,3),
explained_by_generosity number(4,3),
explained_by_perceptions_of_corruption number(4,3),
dystopia_residual number (4,3));
COPY INTO HAPPINESS
FROM @demo_db.public.stage_gcp;
SELECT * FROM HAPPINESS;
Unload data:
------- Unload data -----
USE ROLE ACCOUNTADMIN;
USE DATABASE DEMO_DB;
-- create integration object that contains the access information
CREATE STORAGE INTEGRATION gcp_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://snowflakebucketgcp', 'gcs://snowflakebucketgcpjson');
-- create file format
create or replace file format demo_db.public.fileformat_gcp
TYPE = CSV
FIELD_DELIMITER = ','
SKIP_HEADER = 1;
-- create stage object
create or replace stage demo_db.public.stage_gcp
STORAGE_INTEGRATION = gcp_integration
URL = 'gcs://snowflakebucketgcp/csv_happiness'
FILE_FORMAT = fileformat_gcp
;
ALTER STORAGE INTEGRATION gcp_integration
SET storage_allowed_locations=('gcs://snowflakebucketgcp', 'gcs://snowflakebucketgcpjson')
SELECT * FROM HAPPINESS;
COPY INTO @stage_gcp
FROM
HAPPINESS;
Snowpipe >> Create stage and Pipe:
// Create table first
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.employees (
id INT,
first_name STRING,
last_name STRING,
email STRING,
location STRING,
department STRING
)
// Create file format object
CREATE OR REPLACE file format MANAGE_DB.file_formats.csv_fileformat
type = csv
field_delimiter = ','
skip_header = 1
null_if = ('NULL','null')
empty_field_as_null = TRUE;
// Create stage object with integration object & file format object
CREATE OR REPLACE stage MANAGE_DB.external_stages.csv_folder
URL = 's3://snowflakes3bucket123/csv/snowpipe'
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = MANAGE_DB.file_formats.csv_fileformat
// Create stage object with integration object & file format object
LIST @MANAGE_DB.external_stages.csv_folder
// Create schema to keep things organized
CREATE OR REPLACE SCHEMA MANAGE_DB.pipes
// Define pipe
CREATE OR REPLACE pipe MANAGE_DB.pipes.employee_pipe
auto_ingest = TRUE
AS
COPY INTO OUR_FIRST_DB.PUBLIC.employees
FROM @MANAGE_DB.external_stages.csv_folder
// Describe pipe
DESC pipe employee_pipe
SELECT * FROM OUR_FIRST_DB.PUBLIC.employees
Create pipe:
// Define pipe
CREATE OR REPLACE pipe MANAGE_DB.pipes.employee_pipe
auto_ingest = TRUE
AS
COPY INTO OUR_FIRST_DB.PUBLIC.employees
FROM @MANAGE_DB.external_stages.csv_folder
// Describe pipe
DESC pipe employee_pipe
SELECT * FROM OUR_FIRST_DB.PUBLIC.employees
Error Handling:
// Handling errors
// Create file format object
CREATE OR REPLACE file format MANAGE_DB.file_formats.csv_fileformat
type = csv
field_delimiter = ','
skip_header = 1
null_if = ('NULL','null')
empty_field_as_null = TRUE;
SELECT * FROM OUR_FIRST_DB.PUBLIC.employees
ALTER PIPE employee_pipe refresh
// Validate pipe is actually working
SELECT SYSTEM$PIPE_STATUS('employee_pipe')
// Snowpipe error message
SELECT * FROM TABLE(VALIDATE_PIPE_LOAD(
PIPE_NAME => 'MANAGE_DB.pipes.employee_pipe',
START_TIME => DATEADD(HOUR,-2,CURRENT_TIMESTAMP())))
// COPY command history from table to see error massage
SELECT * FROM TABLE (INFORMATION_SCHEMA.COPY_HISTORY(
table_name => 'OUR_FIRST_DB.PUBLIC.EMPLOYEES',
START_TIME =>DATEADD(HOUR,-2,CURRENT_TIMESTAMP())))
Manage Pipes:
-- Manage pipes --
DESC pipe MANAGE_DB.pipes.employee_pipe;
SHOW PIPES;
SHOW PIPES like '%employee%'
SHOW PIPES in database MANAGE_DB
SHOW PIPES in schema MANAGE_DB.pipes
SHOW PIPES like '%employee%' in Database MANAGE_DB
-- Changing pipe (alter stage or file format) --
// Preparation table first
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.employees2 (
id INT,
first_name STRING,
last_name STRING,
email STRING,
location STRING,
department STRING
)
// Pause pipe
ALTER PIPE MANAGE_DB.pipes.employee_pipe SET PIPE_EXECUTION_PAUSED = true
// Verify pipe is paused and has pendingFileCount 0
SELECT SYSTEM$PIPE_STATUS('MANAGE_DB.pipes.employee_pipe')
// Recreate the pipe to change the COPY statement in the definition
CREATE OR REPLACE pipe MANAGE_DB.pipes.employee_pipe
auto_ingest = TRUE
AS
COPY INTO OUR_FIRST_DB.PUBLIC.employees2
FROM @MANAGE_DB.external_stages.csv_folder
ALTER PIPE MANAGE_DB.pipes.employee_pipe refresh
// List files in stage
LIST @MANAGE_DB.external_stages.csv_folder
SELECT * FROM OUR_FIRST_DB.PUBLIC.employees2
// Reload files manually that where aleady in the bucket
COPY INTO OUR_FIRST_DB.PUBLIC.employees2
FROM @MANAGE_DB.external_stages.csv_folder
// Resume pipe
ALTER PIPE MANAGE_DB.pipes.employee_pipe SET PIPE_EXECUTION_PAUSED = false
// Verify pipe is running again
SELECT SYSTEM$PIPE_STATUS('MANAGE_DB.pipes.employee_pipe')
Time Travel >> Using Time Travel:
// Setting up table
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.test (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string)
CREATE OR REPLACE FILE FORMAT MANAGE_DB.file_formats.csv_file
type = csv
field_delimiter = ','
skip_header = 1
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.time_travel_stage
URL = 's3://data-snowflake-fundamentals/time-travel/'
file_format = MANAGE_DB.file_formats.csv_file;
LIST @MANAGE_DB.external_stages.time_travel_stage
COPY INTO OUR_FIRST_DB.public.test
from @MANAGE_DB.external_stages.time_travel_stage
files = ('customers.csv')
SELECT * FROM OUR_FIRST_DB.public.test
// Use-case: Update data (by mistake)
UPDATE OUR_FIRST_DB.public.test
SET FIRST_NAME = 'Joyen'
// // // Using time travel: Method 1 - 2 minutes back
SELECT * FROM OUR_FIRST_DB.public.test at (OFFSET => -60*1.5)
// // // Using time travel: Method 2 - before timestamp
SELECT * FROM OUR_FIRST_DB.public.test before (timestamp => '2021-04-15 17:47:50.581'::timestamp)
-- Setting up table
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.test (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
COPY INTO OUR_FIRST_DB.public.test
from @MANAGE_DB.external_stages.time_travel_stage
files = ('customers.csv');
SELECT * FROM OUR_FIRST_DB.public.test;
2021-04-17 08:16:24.259
-- Setting up UTC time for convenience
ALTER SESSION SET TIMEZONE ='UTC'
SELECT DATEADD(DAY, 1, CURRENT_TIMESTAMP)
UPDATE OUR_FIRST_DB.public.test
SET Job = 'Data Scientist'
SELECT * FROM OUR_FIRST_DB.public.test;
SELECT * FROM OUR_FIRST_DB.public.test before (timestamp => '2021-04-16 07:30:47.145'::timestamp)
// // // Using time travel: Method 3 - before Query ID
// Preparing table
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.test (
id int,
first_name string,
last_name string,
email string,
gender string,
Phone string,
Job string)
COPY INTO OUR_FIRST_DB.public.test
from @MANAGE_DB.external_stages.time_travel_stage
files = ('customers.csv')
SELECT * FROM OUR_FIRST_DB.public.test
// Altering table (by mistake)
UPDATE OUR_FIRST_DB.public.test
SET EMAIL = null
SELECT * FROM OUR_FIRST_DB.public.test
SELECT * FROM OUR_FIRST_DB.public.test before (statement => '019b9ee5-0500-8473-0043-4d8300073062')
Restoring in time travel:
// Setting up table
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.test (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
COPY INTO OUR_FIRST_DB.public.test
from @MANAGE_DB.external_stages.time_travel_stage
files = ('customers.csv');
SELECT * FROM OUR_FIRST_DB.public.test;
// Use-case: Update data (by mistake)
UPDATE OUR_FIRST_DB.public.test
SET LAST_NAME = 'Tyson';
UPDATE OUR_FIRST_DB.public.test
SET JOB = 'Data Analyst';
SELECT * FROM OUR_FIRST_DB.public.test before (statement => '019b9eea-0500-845a-0043-4d830007402a')
// // // Bad method
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.test as
SELECT * FROM OUR_FIRST_DB.public.test before (statement => '019b9eea-0500-845a-0043-4d830007402a')
SELECT * FROM OUR_FIRST_DB.public.test
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.test as
SELECT * FROM OUR_FIRST_DB.public.test before (statement => '019b9eea-0500-8473-0043-4d830007307a')
// // // Good method
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.test_backup as
SELECT * FROM OUR_FIRST_DB.public.test before (statement => '019b9ef0-0500-8473-0043-4d830007309a')
TRUNCATE OUR_FIRST_DB.public.test
INSERT INTO OUR_FIRST_DB.public.test
SELECT * FROM OUR_FIRST_DB.public.test_backup
SELECT * FROM OUR_FIRST_DB.public.test
Undrop tables:
// Setting up table
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.time_travel_stage
URL = 's3://data-snowflake-fundamentals/time-travel/'
file_format = MANAGE_DB.file_formats.csv_file;
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.customers (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
COPY INTO OUR_FIRST_DB.public.customers
from @MANAGE_DB.external_stages.time_travel_stage
files = ('customers.csv');
SELECT * FROM OUR_FIRST_DB.public.customers;
// UNDROP command - Tables
DROP TABLE OUR_FIRST_DB.public.customers;
SELECT * FROM OUR_FIRST_DB.public.customers;
UNDROP TABLE OUR_FIRST_DB.public.customers;
// UNDROP command - Schemas
DROP SCHEMA OUR_FIRST_DB.public;
SELECT * FROM OUR_FIRST_DB.public.customers;
UNDROP SCHEMA OUR_FIRST_DB.public;
// UNDROP command - Database
DROP DATABASE OUR_FIRST_DB;
SELECT * FROM OUR_FIRST_DB.public.customers;
UNDROP DATABASE OUR_FIRST_DB;
// Restore replaced table
UPDATE OUR_FIRST_DB.public.customers
SET LAST_NAME = 'Tyson';
UPDATE OUR_FIRST_DB.public.customers
SET JOB = 'Data Analyst';
// // // Undroping a with a name that already exists
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.customers as
SELECT * FROM OUR_FIRST_DB.public.customers before (statement => '019b9f7c-0500-851b-0043-4d83000762be')
SELECT * FROM OUR_FIRST_DB.public.customers
UNDROP table OUR_FIRST_DB.public.customers;
ALTER TABLE OUR_FIRST_DB.public.customers
RENAME TO OUR_FIRST_DB.public.customers_wrong;
DESC table OUR_FIRST_DB.public.customers
Time travel cost:
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.STORAGE_USAGE ORDER BY USAGE_DATE DESC;
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS;
// Query time travel storage
SELECT ID,
TABLE_NAME,
TABLE_SCHEMA,
TABLE_CATALOG,
ACTIVE_BYTES / (102410241024) AS STORAGE_USED_GB,
TIME_TRAVEL_BYTES / (102410241024) AS TIME_TRAVEL_STORAGE_USED_GB
FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
ORDER BY STORAGE_USED_GB DESC,TIME_TRAVEL_STORAGE_USED_GB DESC;
Fail Safe >> Fail Safe Storage:
// Storage usage on account level
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.STORAGE_USAGE ORDER BY USAGE_DATE DESC;
// Storage usage on account level formatted
SELECT USAGE_DATE,
STORAGE_BYTES / (102410241024) AS STORAGE_GB,
STAGE_BYTES / (102410241024) AS STAGE_GB,
FAILSAFE_BYTES / (102410241024) AS FAILSAFE_GB
FROM SNOWFLAKE.ACCOUNT_USAGE.STORAGE_USAGE ORDER BY USAGE_DATE DESC;
// Storage usage on table level
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS;
// Storage usage on table level formatted
SELECT ID,
TABLE_NAME,
TABLE_SCHEMA,
ACTIVE_BYTES / (102410241024) AS STORAGE_USED_GB,
TIME_TRAVEL_BYTES / (102410241024) AS TIME_TRAVEL_STORAGE_USED_GB,
FAILSAFE_BYTES / (102410241024) AS FAILSAFE_STORAGE_USED_GB
FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
ORDER BY FAILSAFE_STORAGE_USED_GB DESC;
Types of Tables >> Permanent Tables:
CREATE OR REPLACE DATABASE PDB;
CREATE OR REPLACE TABLE PDB.public.customers (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
CREATE OR REPLACE TABLE PDB.public.helper (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
// Stage and file format
CREATE OR REPLACE FILE FORMAT MANAGE_DB.file_formats.csv_file
type = csv
field_delimiter = ','
skip_header = 1
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.time_travel_stage
URL = 's3://data-snowflake-fundamentals/time-travel/'
file_format = MANAGE_DB.file_formats.csv_file;
LIST @MANAGE_DB.external_stages.time_travel_stage;
// Copy data and insert in table
COPY INTO PDB.public.helper
FROM @MANAGE_DB.external_stages.time_travel_stage
files = ('customers.csv');
SELECT * FROM PDB.public.helper;
INSERT INTO PDB.public.customers
SELECT
t1.ID
,t1.FIRST_NAME
,t1.LAST_NAME
,t1.EMAIL
,t1.GENDER
,t1.JOB
,t1.PHONE
FROM PDB.public.helper t1
CROSS JOIN (SELECT * FROM PDB.public.helper) t2
CROSS JOIN (SELECT TOP 100 * FROM PDB.public.helper) t3;
// Show table and validate
SHOW TABLES;
// Permanent tables
USE OUR_FIRST_DB
CREATE OR REPLACE TABLE customers (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
CREATE OR REPLACE DATABASE PDB;
SHOW DATABASES;
SHOW TABLES;
// View table metrics (takes a bit to appear)
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
SELECT ID,
TABLE_NAME,
TABLE_SCHEMA,
TABLE_CATALOG,
ACTIVE_BYTES / (102410241024) AS ACTIVE_STORAGE_USED_GB,
TIME_TRAVEL_BYTES / (102410241024) AS TIME_TRAVEL_STORAGE_USED_GB,
FAILSAFE_BYTES / (102410241024) AS FAILSAFE_STORAGE_USED_GB,
IS_TRANSIENT,
DELETED,
TABLE_CREATED,
TABLE_DROPPED,
TABLE_ENTERED_FAILSAFE
FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
--WHERE TABLE_CATALOG ='PDB'
WHERE TABLE_DROPPED is not null
ORDER BY FAILSAFE_BYTES DESC;
Transient tables:
CREATE OR REPLACE DATABASE TDB;
CREATE OR REPLACE TRANSIENT TABLE TDB.public.customers_transient (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
INSERT INTO TDB.public.customers_transient
SELECT t1.* FROM OUR_FIRST_DB.public.customers t1
CROSS JOIN (SELECT * FROM OUR_FIRST_DB.public.customers) t2
SHOW TABLES;
// Query storage
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
SELECT ID,
TABLE_NAME,
TABLE_SCHEMA,
TABLE_CATALOG,
ACTIVE_BYTES,
TIME_TRAVEL_BYTES / (102410241024) AS TIME_TRAVEL_STORAGE_USED_GB,
FAILSAFE_BYTES / (102410241024) AS FAILSAFE_STORAGE_USED_GB,
IS_TRANSIENT,
DELETED,
TABLE_CREATED,
TABLE_DROPPED,
TABLE_ENTERED_FAILSAFE
FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
WHERE TABLE_CATALOG ='TDB'
ORDER BY TABLE_CREATED DESC;
// Set retention time to 0
ALTER TABLE TDB.public.customers_transient
SET DATA_RETENTION_TIME_IN_DAYS = 0
DROP TABLE TDB.public.customers_transient;
UNDROP TABLE TDB.public.customers_transient;
SHOW TABLES;
// Creating transient schema and then table
CREATE OR REPLACE TRANSIENT SCHEMA TRANSIENT_SCHEMA;
SHOW SCHEMAS;
CREATE OR REPLACE TABLE TDB.TRANSIENT_SCHEMA.new_table (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
ALTER TABLE TDB.TRANSIENT_SCHEMA.new_table
SET DATA_RETENTION_TIME_IN_DAYS = 2
SHOW TABLES;
Temporary tables:
USE DATABASE PDB;
// Create permanent table
CREATE OR REPLACE TABLE PDB.public.customers (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
INSERT INTO PDB.public.customers
SELECT t1.* FROM OUR_FIRST_DB.public.customers t1
SELECT * FROM PDB.public.customers
// Create temporary table (with the same name)
CREATE OR REPLACE TEMPORARY TABLE PDB.public.customers (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
// Validate temporary table is the active table
SELECT * FROM PDB.public.customers;
// Create second temporary table (with a new name)
CREATE OR REPLACE TEMPORARY TABLE PDB.public.temp_table (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
// Insert data in the new table
INSERT INTO PDB.public.temp_table
SELECT * FROM PDB.public.customers
SELECT * FROM PDB.public.temp_table
SHOW TABLES;
Zero-Copy Cloning >> Cloning tables:
// Cloning
SELECT * FROM OUR_FIRST_DB.PUBLIC.CUSTOMERS
CREATE TABLE OUR_FIRST_DB.PUBLIC.CUSTOMERS_CLONE
CLONE OUR_FIRST_DB.PUBLIC.CUSTOMERS
// Validate the data
SELECT * FROM OUR_FIRST_DB.PUBLIC.CUSTOMERS_CLONE
// Update cloned table
UPDATE OUR_FIRST_DB.public.CUSTOMERS_CLONE
SET LAST_NAME = NULL
SELECT * FROM OUR_FIRST_DB.PUBLIC.CUSTOMERS
SELECT * FROM OUR_FIRST_DB.PUBLIC.CUSTOMERS_CLONE
// Cloning a temporary table is not possible
CREATE OR REPLACE TEMPORARY TABLE OUR_FIRST_DB.PUBLIC.TEMP_TABLE(
id int)
CREATE TEMPORARY TABLE OUR_FIRST_DB.PUBLIC.TABLE_COPY
CLONE OUR_FIRST_DB.PUBLIC.TEMP_TABLE
SELECT * FROM OUR_FIRST_DB.PUBLIC.TABLE_COPY
Cloning Schemas & Databases:
// Cloning Schema
CREATE TRANSIENT SCHEMA OUR_FIRST_DB.COPIED_SCHEMA
CLONE OUR_FIRST_DB.PUBLIC;
SELECT * FROM COPIED_SCHEMA.CUSTOMERS
CREATE TRANSIENT SCHEMA OUR_FIRST_DB.EXTERNAL_STAGES_COPIED
CLONE MANAGE_DB.EXTERNAL_STAGES;
// Cloning Database
CREATE TRANSIENT DATABASE OUR_FIRST_DB_COPY
CLONE OUR_FIRST_DB;
DROP DATABASE OUR_FIRST_DB_COPY
DROP SCHEMA OUR_FIRST_DB.EXTERNAL_STAGES_COPIED
DROP SCHEMA OUR_FIRST_DB.COPIED_SCHEMA
Cloning using time travel:
// Cloning using time travel
// Setting up table
CREATE OR REPLACE TABLE OUR_FIRST_DB.public.time_travel (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
CREATE OR REPLACE FILE FORMAT MANAGE_DB.file_formats.csv_file
type = csv
field_delimiter = ','
skip_header = 1;
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.time_travel_stage
URL = 's3://data-snowflake-fundamentals/time-travel/'
file_format = MANAGE_DB.file_formats.csv_file;
LIST @MANAGE_DB.external_stages.time_travel_stage;
COPY INTO OUR_FIRST_DB.public.time_travel
from @MANAGE_DB.external_stages.time_travel_stage
files = ('customers.csv');
SELECT * FROM OUR_FIRST_DB.public.time_travel
// Update data
UPDATE OUR_FIRST_DB.public.time_travel
SET FIRST_NAME = 'Frank'
// Using time travel
SELECT * FROM OUR_FIRST_DB.public.time_travel at (OFFSET => -60*1)
// Using time travel
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.time_travel_clone
CLONE OUR_FIRST_DB.public.time_travel at (OFFSET => -60*1.5)
SELECT * FROM OUR_FIRST_DB.PUBLIC.time_travel_clone
// Update data again
UPDATE OUR_FIRST_DB.public.time_travel_clone
SET JOB = 'Snowflake Analyst'
// Using time travel: Method 2 - before Query
SELECT * FROM OUR_FIRST_DB.public.time_travel_clone before (statement => ')
CREATE OR REPLACE TABLE OUR_FIRST_DB.PUBLIC.time_travel_clone_of_clone
CLONE OUR_FIRST_DB.public.time_travel_clone before (statement => '')
SELECT * FROM OUR_FIRST_DB.public.time_travel_clone_of_clone
Data Sharing >> Using Data Sharing:
CREATE OR REPLACE DATABASE DATA_S;
CREATE OR REPLACE STAGE aws_stage
url='s3://bucketsnowflakes3';
// List files in stage
LIST @aws_stage;
// Create table
CREATE OR REPLACE TABLE ORDERS (
ORDER_ID VARCHAR(30)
,AMOUNT NUMBER(38,0)
,PROFIT NUMBER(38,0)
,QUANTITY NUMBER(38,0)
,CATEGORY VARCHAR(30)
,SUBCATEGORY VARCHAR(30))
// Load data using copy command
COPY INTO ORDERS
FROM @MANAGE_DB.external_stages.aws_stage
file_format= (type = csv field_delimiter=',' skip_header=1)
pattern='.OrderDetails.';
SELECT * FROM ORDERS;
// Create a share object
CREATE OR REPLACE SHARE ORDERS_SHARE;
---- Setup Grants ----
// Grant usage on database
GRANT USAGE ON DATABASE DATA_S TO SHARE ORDERS_SHARE;
// Grant usage on schema
GRANT USAGE ON SCHEMA DATA_S.PUBLIC TO SHARE ORDERS_SHARE;
// Grant SELECT on table
GRANT SELECT ON TABLE DATA_S.PUBLIC.ORDERS TO SHARE ORDERS_SHARE;
// Validate Grants
SHOW GRANTS TO SHARE ORDERS_SHARE;
---- Add Consumer Account ----
ALTER SHARE ORDERS_SHARE ADD ACCOUNT=;
Create Reader Account:
-- Create Reader Account --
CREATE MANAGED ACCOUNT tech_joy_account
ADMIN_NAME = tech_joy_admin,
ADMIN_PASSWORD = 'set-pwd',
TYPE = READER;
// Make sure to have selected the role of accountadmin
// Show accounts
SHOW MANAGED ACCOUNTS;
-- Share the data --
ALTER SHARE ORDERS_SHARE
ADD ACCOUNT = ;
ALTER SHARE ORDERS_SHARE
ADD ACCOUNT =
SHARE_RESTRICTIONS=false;
-- Create database from share --
// Show all shares (consumer & producers)
SHOW SHARES;
// See details on share
DESC SHARE QNA46172.ORDERS_SHARE;
// Create a database in consumer account using the share
CREATE DATABASE DATA_SHARE_DB FROM SHARE .ORDERS_SHARE;
// Validate table access
SELECT * FROM DATA_SHARE_DB.PUBLIC.ORDERS
// Setup virtual warehouse
CREATE WAREHOUSE READ_WH WITH
WAREHOUSE_SIZE='X-SMALL'
AUTO_SUSPEND = 180
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE;
-- Create and set up users --
// Create user
CREATE USER MYRIAM PASSWORD = 'difficult_passw@ord=123'
// Grant usage on warehouse
GRANT USAGE ON WAREHOUSE READ_WH TO ROLE PUBLIC;
// Grating privileges on a Shared Database for other users
GRANT IMPORTED PRIVILEGES ON DATABASE DATA_SHARE_DB TO REOLE PUBLIC;
Share Entire DB or Schema:
SHOW SHARES;
// Create share object
CREATE OR REPLACE SHARE COMEPLETE_SCHEMA_SHARE;
// Grant usage on dabase & schema
GRANT USAGE ON DATABASE OUR_FIRST_DB TO SHARE COMEPLETE_SCHEMA_SHARE;
GRANT USAGE ON SCHEMA OUR_FIRST_DB.PUBLIC TO SHARE COMEPLETE_SCHEMA_SHARE;
// Grant select on all tables
GRANT SELECT ON ALL TABLES IN SCHEMA OUR_FIRST_DB.PUBLIC TO SHARE COMEPLETE_SCHEMA_SHARE;
GRANT SELECT ON ALL TABLES IN DATABASE OUR_FIRST_DB TO SHARE COMEPLETE_SCHEMA_SHARE;
// Add account to share
ALTER SHARE COMEPLETE_SCHEMA_SHARE
ADD ACCOUNT=KAA74702
// Updating data
UPDATE OUR_FIRST_DB.PUBLIC.ORDERS
SET PROFIT=0 WHERE PROFIT < 0
// Add new table
CREATE TABLE OUR_FIRST_DB.PUBLIC.NEW_TABLE (ID int)
Secure View:
-- Create database & table --
CREATE OR REPLACE DATABASE CUSTOMER_DB;
CREATE OR REPLACE TABLE CUSTOMER_DB.public.customers (
id int,
first_name string,
last_name string,
email string,
gender string,
Job string,
Phone string);
// Stage and file format
CREATE OR REPLACE FILE FORMAT MANAGE_DB.file_formats.csv_file
type = csv
field_delimiter = ','
skip_header = 1;
CREATE OR REPLACE STAGE MANAGE_DB.external_stages.time_travel_stage
URL = 's3://data-snowflake-fundamentals/time-travel/'
file_format = MANAGE_DB.file_formats.csv_file;
LIST @MANAGE_DB.external_stages.time_travel_stage;
// Copy data and insert in table
COPY INTO CUSTOMER_DB.public.customers
FROM @MANAGE_DB.external_stages.time_travel_stage
files = ('customers.csv');
SELECT * FROM CUSTOMER_DB.PUBLIC.CUSTOMERS;
-- Create VIEW --
CREATE OR REPLACE VIEW CUSTOMER_DB.PUBLIC.CUSTOMER_VIEW AS
SELECT
FIRST_NAME,
LAST_NAME,
EMAIL
FROM CUSTOMER_DB.PUBLIC.CUSTOMERS
WHERE JOB != 'DATA SCIENTIST';
-- Grant usage & SELECT --
GRANT USAGE ON DATABASE CUSTOMER_DB TO ROLE PUBLIC;
GRANT USAGE ON SCHEMA CUSTOMER_DB.PUBLIC TO ROLE PUBLIC;
GRANT SELECT ON TABLE CUSTOMER_DB.PUBLIC.CUSTOMERS TO ROLE PUBLIC;
GRANT SELECT ON VIEW CUSTOMER_DB.PUBLIC.CUSTOMER_VIEW TO ROLE PUBLIC;
SHOW VIEWS LIKE '%CUSTOMER%';
-- Create SECURE VIEW --
CREATE OR REPLACE SECURE VIEW CUSTOMER_DB.PUBLIC.CUSTOMER_VIEW_SECURE AS
SELECT
FIRST_NAME,
LAST_NAME,
EMAIL
FROM CUSTOMER_DB.PUBLIC.CUSTOMERS
WHERE JOB != 'DATA SCIENTIST'
GRANT SELECT ON VIEW CUSTOMER_DB.PUBLIC.CUSTOMER_VIEW_SECURE TO ROLE PUBLIC;
SHOW VIEWS LIKE '%CUSTOMER%';
Sharing Views:
SHOW SHARES;
// Create share object
CREATE OR REPLACE SHARE VIEW_SHARE;
// Grant usage on database & schema
GRANT USAGE ON DATABASE CUSTOMER_DB TO SHARE VIEW_SHARE;
GRANT USAGE ON SCHEMA CUSTOMER_DB.PUBLIC TO SHARE VIEW_SHARE;
// Grant select on view
GRANT SELECT ON VIEW CUSTOMER_DB.PUBLIC.CUSTOMER_VIEW TO SHARE VIEW_SHARE;
GRANT SELECT ON VIEW CUSTOMER_DB.PUBLIC.CUSTOMER_VIEW_SECURE TO SHARE VIEW_SHARE;
// Add account to share
ALTER SHARE VIEW_SHARE
ADD ACCOUNT=KAA74702
Data Sampling >> Data Sampling:
CREATE OR REPLACE TRANSIENT DATABASE SAMPLING_DB;
CREATE OR REPLACE VIEW ADDRESS_SAMPLE
AS
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL.CUSTOMER_ADDRESS
SAMPLE ROW (1) SEED(27);
SELECT * FROM ADDRESS_SAMPLE
SELECT CA_LOCATION_TYPE, COUNT()/3254250100
FROM ADDRESS_SAMPLE
GROUP BY CA_LOCATION_TYPE
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL.CUSTOMER_ADDRESS
SAMPLE SYSTEM (1) SEED(23);
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL.CUSTOMER_ADDRESS
SAMPLE SYSTEM (10) SEED(23);
Scheduling Tasks >> Creating Tasks:
CREATE OR REPLACE TRANSIENT DATABASE TASK_DB;
// Prepare table
CREATE OR REPLACE TABLE CUSTOMERS (
CUSTOMER_ID INT AUTOINCREMENT START = 1 INCREMENT =1,
FIRST_NAME VARCHAR(40) DEFAULT 'JENNIFER' ,
CREATE_DATE DATE)
// Create task
CREATE OR REPLACE TASK CUSTOMER_INSERT
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS
INSERT INTO CUSTOMERS(CREATE_DATE) VALUES(CURRENT_TIMESTAMP);
SHOW TASKS;
// Task starting and suspending
ALTER TASK CUSTOMER_INSERT RESUME;
ALTER TASK CUSTOMER_INSERT SUSPEND;
SELECT * FROM CUSTOMERS
Using CRON:
CREATE OR REPLACE TASK CUSTOMER_INSERT
WAREHOUSE = COMPUTE_WH
SCHEDULE = '60 MINUTE'
AS
INSERT INTO CUSTOMERS(CREATE_DATE) VALUES(CURRENT_TIMESTAMP);
CREATE OR REPLACE TASK CUSTOMER_INSERT
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 7,10 * * 5L UTC'
AS
INSERT INTO CUSTOMERS(CREATE_DATE) VALUES(CURRENT_TIMESTAMP);
__ minute (0-59)
| __ hour (0-23)
| | __ day of month (1-31, or L)
| | | __ month (1-12, JAN-DEC)
| | | | __ day of week (0-6, SUN-SAT, or L)
| | | | |
| | | | |
* * * * *
// Every minute
SCHEDULE = 'USING CRON * * * * * UTC'
// Every day at 6am UTC timezone
SCHEDULE = 'USING CRON 0 6 * * * UTC'
// Every hour starting at 9 AM and ending at 5 PM on Sundays
SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'
CREATE OR REPLACE TASK CUSTOMER_INSERT
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 9,17 * * * UTC'
AS
INSERT INTO CUSTOMERS(CREATE_DATE) VALUES(CURRENT_TIMESTAMP);
Creating tree of tasks:
USE TASK_DB;
SHOW TASKS;
SELECT * FROM CUSTOMERS;
// Prepare a second table
CREATE OR REPLACE TABLE CUSTOMERS2 (
CUSTOMER_ID INT,
FIRST_NAME VARCHAR(40),
CREATE_DATE DATE)
// Suspend parent task
ALTER TASK CUSTOMER_INSERT SUSPEND;
// Create a child task
CREATE OR REPLACE TASK CUSTOMER_INSERT2
WAREHOUSE = COMPUTE_WH
AFTER CUSTOMER_INSERT
AS
INSERT INTO CUSTOMERS2 SELECT * FROM CUSTOMERS;
// Prepare a third table
CREATE OR REPLACE TABLE CUSTOMERS3 (
CUSTOMER_ID INT,
FIRST_NAME VARCHAR(40),
CREATE_DATE DATE,
INSERT_DATE DATE DEFAULT DATE(CURRENT_TIMESTAMP))
// Create a child task
CREATE OR REPLACE TASK CUSTOMER_INSERT3
WAREHOUSE = COMPUTE_WH
AFTER CUSTOMER_INSERT2
AS
INSERT INTO CUSTOMERS3 (CUSTOMER_ID,FIRST_NAME,CREATE_DATE) SELECT * FROM CUSTOMERS2;
SHOW TASKS;
ALTER TASK CUSTOMER_INSERT
SET SCHEDULE = '1 MINUTE'
// Resume tasks (first root task)
ALTER TASK CUSTOMER_INSERT RESUME;
ALTER TASK CUSTOMER_INSERT2 RESUME;
ALTER TASK CUSTOMER_INSERT3 RESUME;
SELECT * FROM CUSTOMERS2
SELECT * FROM CUSTOMERS3
// Suspend tasks again
ALTER TASK CUSTOMER_INSERT SUSPEND;
ALTER TASK CUSTOMER_INSERT2 SUSPEND;
ALTER TASK CUSTOMER_INSERT3 SUSPEND;
Task with stored procedure:
// Create a stored procedure
USE TASK_DB;
SELECT * FROM CUSTOMERS
CREATE OR REPLACE PROCEDURE CUSTOMERS_INSERT_PROCEDURE (CREATE_DATE varchar)
RETURNS STRING NOT NULL
LANGUAGE JAVASCRIPT
AS
$$
var sql_command = 'INSERT INTO CUSTOMERS(CREATE_DATE) VALUES(:1);'
snowflake.execute(
{
sqlText: sql_command,
binds: [CREATE_DATE]
});
return "Successfully executed.";
$$;
CREATE OR REPLACE TASK CUSTOMER_TAKS_PROCEDURE
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS CALL CUSTOMERS_INSERT_PROCEDURE (CURRENT_TIMESTAMP);
SHOW TASKS;
ALTER TASK CUSTOMER_TAKS_PROCEDURE RESUME;
SELECT * FROM CUSTOMERS;
Task History:
SHOW TASKS;
USE DEMO_DB;
// Use the table function "TASK_HISTORY()"
select *
from table(information_schema.task_history())
order by scheduled_time desc;
// See results for a specific Task in a given time
select *
from table(information_schema.task_history(
scheduled_time_range_start=>dateadd('hour',-4,current_timestamp()),
result_limit => 5,
task_name=>'CUSTOMER_INSERT2'));
// See results for a given time period
select *
from table(information_schema.task_history(
scheduled_time_range_start=>to_timestamp_ltz('2021-04-22 11:28:32.776 -0700'),
scheduled_time_range_end=>to_timestamp_ltz('2021-04-22 11:35:32.776 -0700')));
SELECT TO_TIMESTAMP_LTZ(CURRENT_TIMESTAMP)
Streams >> Insert:
-------------------- Stream example: INSERT ------------------------
CREATE OR REPLACE TRANSIENT DATABASE STREAMS_DB;
-- Create example table
create or replace table sales_raw_staging(
id varchar,
product varchar,
price varchar,
amount varchar,
store_id varchar);
-- insert values
insert into sales_raw_staging
values
(1,'Banana',1.99,1,1),
(2,'Lemon',0.99,1,1),
(3,'Apple',1.79,1,2),
(4,'Orange Juice',1.89,1,2),
(5,'Cereals',5.98,2,1);
create or replace table store_table(
store_id number,
location varchar,
employees number);
INSERT INTO STORE_TABLE VALUES(1,'Chicago',33);
INSERT INTO STORE_TABLE VALUES(2,'London',12);
create or replace table sales_final_table(
id int,
product varchar,
price number,
amount int,
store_id int,
location varchar,
employees int);
-- Insert into final table
INSERT INTO sales_final_table
SELECT
SA.id,
SA.product,
SA.price,
SA.amount,
ST.STORE_ID,
ST.LOCATION,
ST.EMPLOYEES
FROM SALES_RAW_STAGING SA
JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
-- Create a stream object
create or replace stream sales_stream on table sales_raw_staging;
SHOW STREAMS;
DESC STREAM sales_stream;
-- Get changes on data using stream (INSERTS)
select * from sales_stream;
select * from sales_raw_staging;
-- insert values
insert into sales_raw_staging
values
(6,'Mango',1.99,1,2),
(7,'Garlic',0.99,1,1);
-- Get changes on data using stream (INSERTS)
select * from sales_stream;
select * from sales_raw_staging;
select * from sales_final_table;
-- Consume stream object
INSERT INTO sales_final_table
SELECT
SA.id,
SA.product,
SA.price,
SA.amount,
ST.STORE_ID,
ST.LOCATION,
ST.EMPLOYEES
FROM SALES_STREAM SA
JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
-- Get changes on data using stream (INSERTS)
select * from sales_stream;
-- insert values
insert into sales_raw_staging
values
(8,'Paprika',4.99,1,2),
(9,'Tomato',3.99,1,2);
-- Consume stream object
INSERT INTO sales_final_table
SELECT
SA.id,
SA.product,
SA.price,
SA.amount,
ST.STORE_ID,
ST.LOCATION,
ST.EMPLOYEES
FROM SALES_STREAM SA
JOIN STORE_TABLE ST ON ST.STORE_ID=SA.STORE_ID ;
SELECT * FROM SALES_FINAL_TABLE;
SELECT * FROM SALES_RAW_STAGING;
SELECT * FROM SALES_STREAM;
Update:
-- UPDATE 1 *
SELECT * FROM SALES_RAW_STAGING;
SELECT * FROM SALES_STREAM;
UPDATE SALES_RAW_STAGING
SET PRODUCT ='Potato' WHERE PRODUCT = 'Banana'
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
using SALES_STREAM S -- Stream that has captured the changes
on f.id = s.id
when matched
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE ='TRUE' -- Indicates the record has been updated
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id;
SELECT * FROM SALES_FINAL_TABLE
SELECT * FROM SALES_RAW_STAGING;
SELECT * FROM SALES_STREAM;
-- UPDATE 2 *
UPDATE SALES_RAW_STAGING
SET PRODUCT ='Green apple' WHERE PRODUCT = 'Apple';
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
using SALES_STREAM S -- Stream that has captured the changes
on f.id = s.id
when matched
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE ='TRUE' -- Indicates the record has been updated
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id;
SELECT * FROM SALES_FINAL_TABLE;
SELECT * FROM SALES_RAW_STAGING;
SELECT * FROM SALES_STREAM;
Delete:
-- DELETE *
SELECT * FROM SALES_FINAL_TABLE
SELECT * FROM SALES_RAW_STAGING;
SELECT * FROM SALES_STREAM;
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Lemon';
-- Process stream *
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
using SALES_STREAM S -- Stream that has captured the changes
on f.id = s.id
when matched
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
Process all data changes:
-- Process UPDATE,INSERT & DELETE simultaneously *
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
FROM SALES_STREAM STRE
JOIN STORE_TABLE ST
ON STRE.store_id = ST.store_id
) S
ON F.id=S.id
when matched -- DELETE condition
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
when matched -- UPDATE condition
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE = 'TRUE'
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id
when not matched
and S.METADATA$ACTION ='INSERT'
then insert
(id,product,price,store_id,amount,employees,location)
values
(s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location)
SELECT * FROM SALES_RAW_STAGING;
SELECT * FROM SALES_STREAM;
SELECT * FROM SALES_FINAL_TABLE;
INSERT INTO SALES_RAW_STAGING VALUES (2,'Lemon',0.99,1,1);
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Lemonade'
WHERE PRODUCT ='Lemon'
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Lemonade';
--- Example 2 ---
INSERT INTO SALES_RAW_STAGING VALUES (10,'Lemon Juice',2.99,1,1);
UPDATE SALES_RAW_STAGING
SET PRICE = 3
WHERE PRODUCT ='Mango';
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Potato';
Streams &Tasks:
------- Automate the updates using tasks --
CREATE OR REPLACE TASK all_data_changes
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('SALES_STREAM')
AS
merge into SALES_FINAL_TABLE F -- Target table to merge changes from source table
USING ( SELECT STRE.*,ST.location,ST.employees
FROM SALES_STREAM STRE
JOIN STORE_TABLE ST
ON STRE.store_id = ST.store_id
) S
ON F.id=S.id
when matched -- DELETE condition
and S.METADATA$ACTION ='DELETE'
and S.METADATA$ISUPDATE = 'FALSE'
then delete
when matched -- UPDATE condition
and S.METADATA$ACTION ='INSERT'
and S.METADATA$ISUPDATE = 'TRUE'
then update
set f.product = s.product,
f.price = s.price,
f.amount= s.amount,
f.store_id=s.store_id
when not matched
and S.METADATA$ACTION ='INSERT'
then insert
(id,product,price,store_id,amount,employees,location)
values
(s.id, s.product,s.price,s.store_id,s.amount,s.employees,s.location)
ALTER TASK all_data_changes RESUME;
SHOW TASKS;
// Change data
INSERT INTO SALES_RAW_STAGING VALUES (11,'Milk',1.99,1,2);
INSERT INTO SALES_RAW_STAGING VALUES (12,'Chocolate',4.49,1,2);
INSERT INTO SALES_RAW_STAGING VALUES (13,'Cheese',3.89,1,1);
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Chocolate bar'
WHERE PRODUCT ='Chocolate';
DELETE FROM SALES_RAW_STAGING
WHERE PRODUCT = 'Mango';
// Verify results
SELECT * FROM SALES_RAW_STAGING;
SELECT * FROM SALES_STREAM;
SELECT * FROM SALES_FINAL_TABLE;
// Verify the history
select *
from table(information_schema.task_history())
order by name asc,scheduled_time desc;
Types of Stream:
------- Append-only type ------
USE STREAMS_DB;
SHOW STREAMS;
SELECT * FROM SALES_RAW_STAGING;
-- Create stream with default
CREATE OR REPLACE STREAM SALES_STREAM_DEFAULT
ON TABLE SALES_RAW_STAGING;
-- Create stream with append-only
CREATE OR REPLACE STREAM SALES_STREAM_APPEND
ON TABLE SALES_RAW_STAGING
APPEND_ONLY = TRUE;
-- View streams
SHOW STREAMS;
-- Insert values
INSERT INTO SALES_RAW_STAGING VALUES (14,'Honey',4.99,1,1);
INSERT INTO SALES_RAW_STAGING VALUES (15,'Coffee',4.89,1,2);
INSERT INTO SALES_RAW_STAGING VALUES (15,'Coffee',4.89,1,2);
SELECT * FROM SALES_STREAM_APPEND;
SELECT * FROM SALES_STREAM_DEFAULT;
-- Delete values
SELECT * FROM SALES_RAW_STAGING
DELETE FROM SALES_RAW_STAGING WHERE ID=7;
SELECT * FROM SALES_STREAM_APPEND;
SELECT * FROM SALES_STREAM_DEFAULT;
-- Consume stream via "CREATE TABLE … AS"
CREATE OR REPLACE TEMPORARY TABLE PRODUCT_TABLE
AS SELECT * FROM SALES_STREAM_DEFAULT;
CREATE OR REPLACE TEMPORARY TABLE PRODUCT_TABLE
AS SELECT * FROM SALES_STREAM_APPEND;
-- Update
UPDATE SALES_RAW_STAGING
SET PRODUCT = 'Coffee 200g'
WHERE PRODUCT ='Coffee';
SELECT * FROM SALES_STREAM_APPEND;
SELECT * FROM SALES_STREAM;
Change Clause:
----- Change clause ------
--- Create example db & table ---
CREATE OR REPLACE DATABASE SALES_DB;
create or replace table sales_raw(
id varchar,
product varchar,
price varchar,
amount varchar,
store_id varchar);
-- insert values
insert into sales_raw
values
(1, 'Eggs', 1.39, 1, 1),
(2, 'Baking powder', 0.99, 1, 1),
(3, 'Eggplants', 1.79, 1, 2),
(4, 'Ice cream', 1.89, 1, 2),
(5, 'Oats', 1.98, 2, 1);
ALTER TABLE sales_raw
SET CHANGE_TRACKING = TRUE;
SELECT * FROM SALES_RAW
CHANGES(information => default)
AT (offset => -0.5*60)
SELECT CURRENT_TIMESTAMP;
-- Insert values
INSERT INTO SALES_RAW VALUES (6, 'Bread', 2.99, 1, 2);
INSERT INTO SALES_RAW VALUES (7, 'Onions', 2.89, 1, 2);
SELECT * FROM SALES_RAW
CHANGES(information => default)
AT (timestamp => 'your-timestamp'::timestamp_tz)
UPDATE SALES_RAW
SET PRODUCT = 'Toast2' WHERE ID=6;
// information value
SELECT * FROM SALES_RAW
CHANGES(information => default)
AT (timestamp => 'your-timestamp'::timestamp_tz)
SELECT * FROM SALES_RAW
CHANGES(information => append_only)
AT (timestamp => 'your-timestamp'::timestamp_tz)
CREATE OR REPLACE TABLE PRODUCTS
AS
SELECT * FROM SALES_RAW
CHANGES(information => append_only)
AT (timestamp => 'your-timestamp'::timestamp_tz)
SELECT * FROM PRODUCTS;
Materialized Views >> Create Materialized View:
-- Remove caching just to have a fair test -- Part 1
ALTER SESSION SET USE_CACHED_RESULT=FALSE; -- disable global caching
ALTER warehouse compute_wh suspend;
ALTER warehouse compute_wh resume;
-- Prepare table
CREATE OR REPLACE TRANSIENT DATABASE ORDERS;
CREATE OR REPLACE SCHEMA TPCH_SF100;
CREATE OR REPLACE TABLE TPCH_SF100.ORDERS AS
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.ORDERS;
SELECT * FROM ORDERS LIMIT 100
-- Example statement view --
SELECT
YEAR(O_ORDERDATE) AS YEAR,
MAX(O_COMMENT) AS MAX_COMMENT,
MIN(O_COMMENT) AS MIN_COMMENT,
MAX(O_CLERK) AS MAX_CLERK,
MIN(O_CLERK) AS MIN_CLERK
FROM ORDERS.TPCH_SF100.ORDERS
GROUP BY YEAR(O_ORDERDATE)
ORDER BY YEAR(O_ORDERDATE);
-- Create materialized view
CREATE OR REPLACE MATERIALIZED VIEW ORDERS_MV
AS
SELECT
YEAR(O_ORDERDATE) AS YEAR,
MAX(O_COMMENT) AS MAX_COMMENT,
MIN(O_COMMENT) AS MIN_COMMENT,
MAX(O_CLERK) AS MAX_CLERK,
MIN(O_CLERK) AS MIN_CLERK
FROM ORDERS.TPCH_SF100.ORDERS
GROUP BY YEAR(O_ORDERDATE);
SHOW MATERIALIZED VIEWS;
-- Query view
SELECT * FROM ORDERS_MV
ORDER BY YEAR;
-- UPDATE or DELETE values
UPDATE ORDERS
SET O_CLERK='Clerk#99900000'
WHERE O_ORDERDATE='1992-01-01'
-- Test updated data --
-- Example statement view --
SELECT
YEAR(O_ORDERDATE) AS YEAR,
MAX(O_COMMENT) AS MAX_COMMENT,
MIN(O_COMMENT) AS MIN_COMMENT,
MAX(O_CLERK) AS MAX_CLERK,
MIN(O_CLERK) AS MIN_CLERK
FROM ORDERS.TPCH_SF100.ORDERS
GROUP BY YEAR(O_ORDERDATE)
ORDER BY YEAR(O_ORDERDATE);
-- Query view
SELECT * FROM ORDERS_MV
ORDER BY YEAR;
SHOW MATERIALIZED VIEWS;
Refresh in Materialized Views:
-- Remove caching just to have a fair test -- Part 2
ALTER SESSION SET USE_CACHED_RESULT=FALSE; -- disable global caching
ALTER warehouse compute_wh suspend;
ALTER warehouse compute_wh resume;
-- Prepare table
CREATE OR REPLACE TRANSIENT DATABASE ORDERS;
CREATE OR REPLACE SCHEMA TPCH_SF100;
CREATE OR REPLACE TABLE TPCH_SF100.ORDERS AS
SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.ORDERS;
SELECT * FROM ORDERS LIMIT 100
-- Example statement view --
SELECT
YEAR(O_ORDERDATE) AS YEAR,
MAX(O_COMMENT) AS MAX_COMMENT,
MIN(O_COMMENT) AS MIN_COMMENT,
MAX(O_CLERK) AS MAX_CLERK,
MIN(O_CLERK) AS MIN_CLERK
FROM ORDERS.TPCH_SF100.ORDERS
GROUP BY YEAR(O_ORDERDATE)
ORDER BY YEAR(O_ORDERDATE);
-- Create materialized view
CREATE OR REPLACE MATERIALIZED VIEW ORDERS_MV
AS
SELECT
YEAR(O_ORDERDATE) AS YEAR,
MAX(O_COMMENT) AS MAX_COMMENT,
MIN(O_COMMENT) AS MIN_COMMENT,
MAX(O_CLERK) AS MAX_CLERK,
MIN(O_CLERK) AS MIN_CLERK
FROM ORDERS.TPCH_SF100.ORDERS
GROUP BY YEAR(O_ORDERDATE);
SHOW MATERIALIZED VIEWS;
-- Query view
SELECT * FROM ORDERS_MV
ORDER BY YEAR;
-- UPDATE or DELETE values
UPDATE ORDERS
SET O_CLERK='Clerk#99900000'
WHERE O_ORDERDATE='1992-01-01'
-- Test updated data --
-- Example statement view --
SELECT
YEAR(O_ORDERDATE) AS YEAR,
MAX(O_COMMENT) AS MAX_COMMENT,
MIN(O_COMMENT) AS MIN_COMMENT,
MAX(O_CLERK) AS MAX_CLERK,
MIN(O_CLERK) AS MIN_CLERK
FROM ORDERS.TPCH_SF100.ORDERS
GROUP BY YEAR(O_ORDERDATE)
ORDER BY YEAR(O_ORDERDATE);
-- Query view
SELECT * FROM ORDERS_MV
ORDER BY YEAR;
SHOW MATERIALIZED VIEWS;
select * from table(information_schema.materialized_view_refresh_history())
Maintenance Costs:
SHOW MATERIALIZED VIEWS;
select * from table(information_schema.materialized_view_refresh_history())
Dynamic Data Masking >> Create Masking Policy:
USE DEMO_DB;USE ROLE ACCOUNTADMIN;
-- Prepare table --create or replace table customers(id number,full_name varchar,email varchar,phone varchar,spent number,create_date DATE DEFAULT CURRENT_DATE);
-- insert values in table --insert into customers (id, full_name, email,phone,spent)values(1,'Lewiss MacDwyer','lmacdwyer0@un.org','262-665-9168',140),(2,'Ty Pettingall','tpettingall1@mayoclinic.com','734-987-7120',254),(3,'Marlee Spadazzi','mspadazzi2@txnews.com','867-946-3659',120),(4,'Heywood Tearney','htearney3@patch.com','563-853-8192',1230),(5,'Odilia Seti','oseti4@globo.com','730-451-8637',143),(6,'Meggie Washtell','mwashtell5@rediff.com','568-896-6138',600);
-- set up rolesCREATE OR REPLACE ROLE ANALYST_MASKED;CREATE OR REPLACE ROLE ANALYST_FULL;
-- grant select on table to rolesGRANT SELECT ON TABLE DEMO_DB.PUBLIC.CUSTOMERS TO ROLE ANALYST_MASKED;GRANT SELECT ON TABLE DEMO_DB.PUBLIC.CUSTOMERS TO ROLE ANALYST_FULL;
GRANT USAGE ON SCHEMA DEMO_DB.PUBLIC TO ROLE ANALYST_MASKED;GRANT USAGE ON SCHEMA DEMO_DB.PUBLIC TO ROLE ANALYST_FULL;
-- grant warehouse access to rolesGRANT USAGE ON WAREHOUSE COMPUTE_WH TO ROLE ANALYST_MASKED;GRANT USAGE ON WAREHOUSE COMPUTE_WH TO ROLE ANALYST_FULL;
-- assign roles to a userGRANT ROLE ANALYST_MASKED TO USER NIKOLAISCHULER;GRANT ROLE ANALYST_FULL TO USER NIKOLAISCHULER;
-- Set up masking policy
create or replace masking policy phoneas (val varchar) returns varchar ->casewhen current_role() in ('ANALYST_FULL', 'ACCOUNTADMIN') then valelse '##-###-##'end;
-- Apply policy on a specific columnALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN phoneSET MASKING POLICY PHONE;
-- Validating policies
USE ROLE ANALYST_FULL;
SELECT * FROM CUSTOMERS;
USE ROLE ANALYST_MASKED;
SELECT * FROM CUSTOMERS;
Unset & Replace Policy:
#### More examples #####
USE ROLE ACCOUNTADMIN;
--- 1) Apply policy to multiple columns
-- Apply policy on a specific column
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN full_name
SET MASKING POLICY phone;
-- Apply policy on another specific column
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN phone
SET MASKING POLICY phone;
--- 2) Replace or drop policy
DROP masking policy phone;
create or replace masking policy phone as (val varchar) returns varchar ->
case
when current_role() in ('ANALYST_FULL', 'ACCOUNTADMIN') then val
else CONCAT(LEFT(val,2),'*')
end;
-- List and describe policies
DESC MASKING POLICY phone;
SHOW MASKING POLICIES;
-- Show columns with applied policies
SELECT * FROM table(information_schema.policy_references(policy_name=>'phone'));
-- Remove policy before replacing/dropping
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN full_name
SET MASKING POLICY phone;
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN email
UNSET MASKING POLICY;
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN phone
UNSET MASKING POLICY;
-- replace policy
create or replace masking policy names as (val varchar) returns varchar ->
case
when current_role() in ('ANALYST_FULL', 'ACCOUNTADMIN') then val
else CONCAT(LEFT(val,2),'*')
end;
-- apply policy
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN full_name
SET MASKING POLICY names;
-- Validating policies
USE ROLE ANALYST_FULL;
SELECT * FROM CUSTOMERS;
USE ROLE ANALYST_MASKED;
SELECT * FROM CUSTOMERS;
Alter Existing Policies:
-- Alter existing policies
USE ROLE ANALYST_MASKED;
SELECT * FROM CUSTOMERS;
USE ROLE ACCOUNTADMIN;
alter masking policy phone set body ->
case
when current_role() in ('ANALYST_FULL', 'ACCOUNTADMIN') then val
else '--**'
end;
ALTER TABLE CUSTOMERS MODIFY COLUMN email UNSET MASKING POLICY;
Real-life Examples:
More examples 1
USE ROLE ACCOUNTADMIN;
create or replace masking policy emails as (val varchar) returns varchar ->
case
when current_role() in ('ANALYST_FULL') then val
when current_role() in ('ANALYST_MASKED') then regexp_replace(val,'.+\@','@') -- leave email domain unmasked else '***'
end;
-- apply policy
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN email
SET MASKING POLICY emails;
-- Validating policies
USE ROLE ANALYST_FULL;
SELECT * FROM CUSTOMERS;
USE ROLE ANALYST_MASKED;
SELECT * FROM CUSTOMERS;
USE ROLE ACCOUNTADMIN;
More examples 2
create or replace masking policy sha2 as (val varchar) returns varchar ->
case
when current_role() in ('ANALYST_FULL') then val
else sha2(val) -- return hash of the column value
end;
-- apply policy
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN full_name
SET MASKING POLICY sha2;
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN full_name
UNSET MASKING POLICY;
-- Validating policies
USE ROLE ANALYST_FULL;
SELECT * FROM CUSTOMERS;
USE ROLE ANALYST_MASKED;
SELECT * FROM CUSTOMERS;
USE ROLE ACCOUNTADMIN;
More examples 3
create or replace masking policy dates as (val date) returns date ->
case
when current_role() in ('ANALYST_FULL') then val
else date_from_parts(0001, 01, 01)::date -- returns 0001-01-01 00:00:00.000
end;
-- Apply policy on a specific column
ALTER TABLE IF EXISTS CUSTOMERS MODIFY COLUMN create_date
SET MASKING POLICY dates;
-- Validating policies
USE ROLE ANALYST_FULL;
SELECT * FROM CUSTOMERS;
USE ROLE ANALYST_MASKED;
SELECT * FROM CUSTOMERS;
Access Management >> ACCOUNTADMIN:
--- User 1 ---
CREATE USER maria PASSWORD = '123'
DEFAULT_ROLE = ACCOUNTADMIN
MUST_CHANGE_PASSWORD = TRUE;
GRANT ROLE ACCOUNTADMIN TO USER maria;
--- User 2 ---
CREATE USER frank PASSWORD = '123'
DEFAULT_ROLE = SECURITYADMIN
MUST_CHANGE_PASSWORD = TRUE;
GRANT ROLE SECURITYADMIN TO USER frank;
--- User 3 ---
CREATE USER adam PASSWORD = '123'
DEFAULT_ROLE = SYSADMIN
MUST_CHANGE_PASSWORD = TRUE;
GRANT ROLE SYSADMIN TO USER adam;
SECURITYADMIN:
-- SECURITYADMIN role --
-- Create and Manage Roles & Users --
-- Create Sales Roles & Users for SALES--
create role sales_admin;
create role sales_users;
-- Create hierarchy
grant role sales_users to role sales_admin;
-- As per best practice assign roles to SYSADMIN
grant role sales_admin to role SYSADMIN;
-- create sales user
CREATE USER simon_sales PASSWORD = '123' DEFAULT_ROLE = sales_users
MUST_CHANGE_PASSWORD = TRUE;
GRANT ROLE sales_users TO USER simon_sales;
-- create user for sales administration
CREATE USER olivia_sales_admin PASSWORD = '123' DEFAULT_ROLE = sales_admin
MUST_CHANGE_PASSWORD = TRUE;
GRANT ROLE sales_admin TO USER olivia_sales_admin;
-- Create Sales Roles & Users for HR--
create role hr_admin;
create role hr_users;
-- Create hierarchy
grant role hr_users to role hr_admin;
-- This time we will not assign roles to SYSADMIN (against best practice)
-- grant role hr_admin to role SYSADMIN;
-- create hr user
CREATE USER oliver_hr PASSWORD = '123' DEFAULT_ROLE = hr_users
MUST_CHANGE_PASSWORD = TRUE;
GRANT ROLE hr_users TO USER oliver_hr;
-- create user for sales administration
CREATE USER mike_hr_admin PASSWORD = '123' DEFAULT_ROLE = hr_admin
MUST_CHANGE_PASSWORD = TRUE;
GRANT ROLE hr_admin TO USER mike_hr_admin;
SYSADMIN:
-- SYSADMIN --
-- Create a warehouse of size X-SMALL
create warehouse public_wh with
warehouse_size='X-SMALL'
auto_suspend=300
auto_resume= true
-- grant usage to role public
grant usage on warehouse public_wh
to role public
-- create a database accessible to everyone
create database common_db;
grant usage on database common_db to role public
-- create sales database for sales
create database sales_database;
grant ownership on database sales_database to role sales_admin;
grant ownership on schema sales_database.public to role sales_admin
SHOW DATABASES;
-- create database for hr
drop database hr_db;
grant ownership on database hr_db to role hr_admin;
grant ownership on schema hr_db.public to role hr_admin
Custom Roles:
USE ROLE SALES_ADMIN;
USE SALES_DATABASE;
-- Create table --
create or replace table customers(
id number,
full_name varchar,
email varchar,
phone varchar,
spent number,
create_date DATE DEFAULT CURRENT_DATE);
-- insert values in table --
insert into customers (id, full_name, email,phone,spent)
values
(1,'Lewiss MacDwyer','lmacdwyer0@un.org','262-665-9168',140),
(2,'Ty Pettingall','tpettingall1@mayoclinic.com','734-987-7120',254),
(3,'Marlee Spadazzi','mspadazzi2@txnews.com','867-946-3659',120),
(4,'Heywood Tearney','htearney3@patch.com','563-853-8192',1230),
(5,'Odilia Seti','oseti4@globo.com','730-451-8637',143),
(6,'Meggie Washtell','mwashtell5@rediff.com','568-896-6138',600);
SHOW TABLES;
-- query from table --
SELECT* FROM CUSTOMERS;
USE ROLE SALES_USERS;
-- grant usage to role
USE ROLE SALES_ADMIN;
GRANT USAGE ON DATABASE SALES_DATABASE TO ROLE SALES_USERS;
GRANT USAGE ON SCHEMA SALES_DATABASE.PUBLIC TO ROLE SALES_USERS;
GRANT SELECT ON TABLE SALES_DATABASE.PUBLIC.CUSTOMERS TO ROLE SALES_USERS
-- Validate privileges --
USE ROLE SALES_USERS;
SELECT* FROM CUSTOMERS;
DROP TABLE CUSTOMERS;
DELETE FROM CUSTOMERS;
SHOW TABLES;
-- grant DROP on table
USE ROLE SALES_ADMIN;
GRANT DELETE ON TABLE SALES_DATABASE.PUBLIC.CUSTOMERS TO ROLE SALES_USERS
USE ROLE SALES_ADMIN;
USERADMIN:
-- USERADMIN --
--- User 4 ---
CREATE USER ben PASSWORD = '123'
DEFAULT_ROLE = ACCOUNTADMIN
MUST_CHANGE_PASSWORD = TRUE;
GRANT ROLE HR_ADMIN TO USER ben;
SHOW ROLES;
GRANT ROLE HR_ADMIN TO ROLE SYSADMIN;
Best Practises >> Monitoring Resources:
-- Table Storage
SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."TABLE_STORAGE_METRICS";
-- How much is queried in databases
SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY";
SELECT
DATABASE_NAME,
COUNT(*) AS NUMBER_OF_QUERIES,
SUM(CREDITS_USED_CLOUD_SERVICES)
FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
GROUP BY DATABASE_NAME;
-- Usage of credits by warehouses
SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."WAREHOUSE_METERING_HISTORY";
-- Usage of credits by warehouses // Grouped by day
SELECT
DATE(START_TIME),
SUM(CREDITS_USED)
FROM "SNOWFLAKE"."ACCOUNT_USAGE"."WAREHOUSE_METERING_HISTORY"
GROUP BY DATE(START_TIME);
-- Usage of credits by warehouses // Grouped by warehouse
SELECT
WAREHOUSE_NAME,
SUM(CREDITS_USED)
FROM "SNOWFLAKE"."ACCOUNT_USAGE"."WAREHOUSE_METERING_HISTORY"
GROUP BY WAREHOUSE_NAME;
-- Usage of credits by warehouses // Grouped by warehouse & day
SELECT
DATE(START_TIME),
WAREHOUSE_NAME,
SUM(CREDITS_USED)
FROM "SNOWFLAKE"."ACCOUNT_USAGE"."WAREHOUSE_METERING_HISTORY"
GROUP BY WAREHOUSE_NAME,DATE(START_TIME);