Harnessing the capabilities of Snowflake can lead to diverse avenues of functionality for data professionals. A notable feature is Snowflake's aptitude to send notifications, either directly via email or by generating files in cloud storage. Here's a walkthrough on setting up and utilizing this feature.
Configuring the Notification Integration
To initiate, a notification integration in Snowflake must be established:
create or replace notification integration my_email
type=email
enabled=true
allowed_recipients=('user@example.com');
In this example, we've christened our integration as `my_email`. It's paramount to remember that, for heightened security, Snowflake restricts email recipients to those registered with an active Snowflake account.
A few salient aspects of the Notification System include:
- Emails propelled from the Notification System are transmitted through Snowflake's AWS foundation, capitalizing on AWS Simple Email Service (SES).
- To ascertain efficient message delivery, the content of an email delivered via AWS is safeguarded by Snowflake for a duration of thirty days, post which it's eradicated.
- Authenticating email addresses is crucial. Each address within ALLOWED_RECIPIENTS must be cross-verified by the Snowflake user it corresponds to. Non-adherence results in an abortive CREATE NOTIFICATION INTEGRATION command.
- A limiting feature: each account can craft a maximum of ten notification integrations.
Crafting the Metadata Table
The linchpin of this solution is the metadata table, a structured blueprint that contains SQL commands awaiting execution and the accompanying email details.
create or replace TABLE METADATA_EMAILNOTIFICATION(
ROWNUMBER NUMBER(38,0) identity(1,1) COMMENT 'KEY COLUMN RUNNING NUMBER',
SQLQUERY VARCHAR(100000) COLLATE 'en-ci',
FREQUENCY VARCHAR(50) COLLATE 'en-ci',--Weekly_5_16:30 (Weekly_=Weekly_,5=Dayof the week,16:60=hh:mm) OR Daily_16:30 (Daily_ = Daily_, 16:30 = hh:mm) OR Every_2 (Every_= Every_,2 = Duraton of exeuton)
EMAILTO VARCHAR(500) COLLATE 'en-ci',
SUBJECT VARCHAR(500) COLLATE 'en-ci',
ISACTIVE BOOLEAN,
LASTRUNDATETIME TIMESTAMP_NTZ(9),-- edn date time
LASTRUNSTATUS BOOLEAN,
LASTRUNERROR VARCHAR(1000) COLLATE 'en-ci',
LASTSTARTDATETIME VARCHAR(1000) COLLATE 'en-ci',
COLUMNNAME VARCHAR(1000) COLLATE 'en-ci',
IsRunning BOOLEAN,
Send_as_blob_file BOOLEAN
);
The table comprises pivotal columns:
- SQLQUERY: Captures the SQL command to be triggered.
- FREQUENCY: Dictates the rhythm of the SQL command's execution. For instance, Weekly_5_16:30 implies a Friday 4:30 PM execution.
- EMAILTO, SUBJECT, BODY: These choreograph the email dynamics.
- ISACTIVE: A binary indication of the row's active status.
- LASTRUNDATETIME: A timestamp of the command's most recent execution.
- LASTRUNSTATUS: Status post the latest run.
- LASTRUNERROR: Logs any discrepancies from the recent execution.
- SEND_AS_BLOB_FILE: This is pivotal. When marked as '1' or 'yes', the command yields a CSV file that is subsequently dispatched to Azure blob storage, harmonizing with integrated storage setups.
To populate this table, administrators can seamlessly slide in rows representing their desired queries:
INSERT INTO METADATA_EMAILNOTIFICATION (SQLQUERY, FREQUENCY, EMAILTO, SUBJECT, ISACTIVE, COLUMNNAME,Send_as_blob_file)
VALUES
(
'SELECT top 5 ifnull(QUERY_ID,'''') as QUERY_ID, ifNULL(DATABASE_NAME,'''') as DATABASE_NAME, ifnull(USER_NAME,'''') as USER_NAME, ifnull(WAREHOUSE_SIZE,'''') as WAREHOUSE_SIZE, ifnull(QUERY_TYPE,'''') as QUERY_TYPE, ifnull(TOTAL_ELAPSED_TIME,0) as TOTAL_ELAPSED_TIME FROM snowflake.account_usage.QUERY_HISTORY',
'Every_2',
'user@example.com',
'Test Email 123...',
1,
'QUERY_ID,DATABASE_NAME,USER_NAME,WAREHOUSE_SIZE,QUERY_TYPE',
1
);
INSERT INTO METADATA_EMAILNOTIFICATION (SQLQUERY, FREQUENCY, EMAILTO, SUBJECT, ISACTIVE, COLUMNNAME,Send_as_blob_file)
VALUES
(
'select top 10 start_time,end_time,warehouse_id,warehouse_name from SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY ',
'Every_1',
'user@example.com',
'Test Email 123 output as table...',
1,
'start_time,end_time,warehouse_id,warehouse_name',
0
);
Utilizing Stored Procedures for Diverse Notifications
Two distinct stored procedures amplify the solution's prowess:
- `GETMETADATA_EMAILNOTIFICATION()`: This procedure vigilantly combs through the metadata table, zeroing in on SQL commands ripe for execution, based on a matrix of frequency, active status, and more.
- `emailnotification`: Post filtration by the first procedure, this is invoked. Depending on the `SEND_AS_BLOB_FILE` column's directive, it either directs the query's results as an email or crafts a CSV on Azure blob storage.
Here is the code for the procedures
CREATE OR REPLACE PROCEDURE GETMETADATA_EMAILNOTIFICATION()
RETURNS VARCHAR(16777216)
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS $$
var rownum="",sqlquery="", columnname="", resultdata="",currenttime="",sql_updatestatement="",sqlupdatequery="",Send_as_blob_file="";
var errstr="";
resultdata="";
try
{
var sql_query = "SELECT ROWNUMBER,SQLQUERY,EMAILTO,SUBJECT,COLUMNNAME,Send_as_blob_file,FREQUENCY FROM METADATA_EMAILNOTIFICATION ";
sql_query+= " WHERE ISACTIVE=1 and (IsRunning = 0 or IsRunning is null) And (( FREQUENCY like 'Weekly%' AND ( LastStartDateTime IS NULL OR TO_DATE(LastStartDateTime) <> CURRENT_DATE()) ";
sql_query+= " And SUBSTRING(FREQUENCY,8,1) = TO_CHAR( DATE_PART(dw, CURRENT_TIMESTAMP()::DATE)) ";
sql_query+= " And SUBSTRING(FREQUENCY,10) <= TO_CHAR(CURRENT_TIMESTAMP(), 'HH24:MI'))";
sql_query+= " Or ";
sql_query+= " (( FREQUENCY like 'Daily%' ) AND ( LastStartDateTime IS NULL OR TO_DATE(LastStartDateTime) <> CURRENT_DATE()) And ";
sql_query+= " REPLACE(FREQUENCY,'Daily_','') <= TO_CHAR(CURRENT_TIMESTAMP(),'HH24:MI')) ";
sql_query+= " Or ";
sql_query+= " (FREQUENCY LIKE 'Every_%' AND ( LastStartDateTime IS NULL OR ";
sql_query+= " (REPLACE(FREQUENCY,'Every_','')*60) <= TIMEDIFF(minute, LastStartDateTime, CURRENT_TIMESTAMP()))))" ;
var sql_statement = snowflake.createStatement({sqlText: sql_query});
var result_scan = sql_statement.execute();
while (result_scan.next())
{
rownum=result_scan.getColumnValue(1);
sqlquery=result_scan.getColumnValue(2);
emails=result_scan.getColumnValue(3);
emailsubject=result_scan.getColumnValue(4);
columnname=result_scan.getColumnValue(5);
Send_as_blob_file=result_scan.getColumnValue(6);
var sqlupdatequery_new = "update METADATA_EMAILNOTIFICATION set IsRunning =true,LastStartDateTime= CURRENT_TIMESTAMP() where rownumber="+rownum;
var sql_updatestatement_new = snowflake.createStatement({sqlText:sqlupdatequery_new});
sql_updatestatement_new.execute();
var sqlquery_new = sqlquery.replaceAll("'","~~");
var call_sqlquery = "call emailnotification('"+ sqlquery_new+"','"+columnname+"','"+emails+"','"+ emailsubject+"',"+ Send_as_blob_file +")";
var sql_statement_call = snowflake.createStatement({sqlText:call_sqlquery});
var resultdata = sql_statement_call.execute();
resultdata.next();
result=resultdata.getColumnValue(1);
if(result.toLowerCase() =="success")
{
sqlupdatequery = "update METADATA_EMAILNOTIFICATION set LASTRUNDATETIME= CURRENT_TIMESTAMP(),lastrunerror='' ,LASTRUNSTATUS =true,IsRunning =false where rownumber="+rownum;
}
else
{
sqlupdatequery = "update METADATA_EMAILNOTIFICATION set LASTRUNDATETIME= CURRENT_TIMESTAMP() ,LASTRUNSTATUS =false ,IsRunning =false,lastrunerror='"+ result +"' ";
sqlupdatequery +="where rownumber="+rownum;
}
sql_updatestatement = snowflake.createStatement({sqlText:sqlupdatequery});
sql_updatestatement.execute();
}
return "success";
}
catch (err)
{
errstr = "Failed: Code: " + err.code + "\\n State: " + err.state;
errstr += "\\n Message: " + err.message;
errstr += "\\nStack Trace:\\n" + err.stackTraceTxt;
sqlupdatequery = "update METADATA_EMAILNOTIFICATION set LASTRUNSTATUS =false,IsRunning =false,lastrunerror='"+ errstr +"' where rownumber="+rownum;
sql_updatestatement = snowflake.createStatement({sqlText:sqlupdatequery});
sql_updatestatement.execute();
}
$$;
CREATE OR REPLACE PROCEDURE emailnotification(QUERY varchar(8000),COLUMNNAME VARCHAR(8000),EMAILS VARCHAR(1000),EMAILSUBJECT VARCHAR(1000),SEND_AS_BLOB_FILE BOOLEAN)
RETURNS VARCHAR(16777216)
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS $$
try
{
var resultdata="";
var QUERY = QUERY.replaceAll("~~","'");
var stmt = snowflake.createStatement({sqlText: QUERY});
var resultSet = stmt.execute();
if(!resultSet.next())
{
return "record does not found";
}
if(SEND_AS_BLOB_FILE == 1)
{
var today="",date="",time="",filename="",fileurl="";
today = new Date();
date = today.getFullYear()+'_'+(today.getMonth())+'_'+today.getDate();
time = today.getHours()+'_'+today.getMinutes()+'_'+today.getSeconds();
filename = "METADATA_EMAILNOTIFICATION"+ date +"_"+ time +".csv";
fileurl="https://myfiles.blob.core.windows.net/snowflakefiles/"+filename;
var copy_command_query ="COPY INTO @my_azure_stage/"+ filename +" from ";
copy_command_query +=" ("+ QUERY +")";
copy_command_query +=" HEADER=TRUE";
copy_command_query +=" SINGLE = TRUE";
copy_command_query +=" OVERWRITE =TRUE;"
var sql_statement_copy = snowflake.createStatement({sqlText:copy_command_query});
sql_statement_copy.execute();
resultdata = "Output is available in the link below.
URL : " +fileurl ;
}
else
{
const myArray = COLUMNNAME.split(",");
//var columncount= resultSet.getColumnCount();
resultdata=''
resultdata+=''
for (var j = 0; j < myArray.length; j++)
{
resultdata+=''+ myArray[j] +' ';
}
resultdata+=' '
for (var i = 1; i <= stmt.getRowCount(); i++)
{
resultSet.next()
resultdata+=''
for (var j = 0; j < myArray.length; j++)
{
var str = resultSet.getColumnValue(j+1);
resultdata+=''+ str+' ';
}
resultdata+=' ';
resultSet.next();
}
resultdata+="
"
}
if(resultdata!="")
{
sqlquery = "call system$send_email('my_email','"+EMAILS+"','"+EMAILSUBJECT+"','"+ resultdata+"','text/html')";
sql_statement = snowflake.createStatement({sqlText:sqlquery});
sql_statement.execute();
}
return "success";
}
catch(err)
{
var errstr;
errstr = "Failed: Code: " + err.code + "\\n State: " + err.state;
errstr += "\\n Message: " + err.message;
errstr += "\\nStack Trace:\\n" + err.stackTraceTxt;
return errstr;
}$$;
Scheduling via Snowflake Tasks
For automating the entire suite, Snowflake tasks are formulated to function at sixty-second intervals:
create or replace task my_job_task
warehouse = mywh
schedule = '1 minute'
as
call GETMETADATA_EMAILNOTIFICATION();
Though minutely in its run, the GETMETADATA_EMAILNOTIFICATION() procedure showcases discernment, selectively picking rows that meet specific prerequisites, optimizing system efficiency.
Empowering Modern Data Teams
The bevy of advantages is hard to overlook:
- System administrators gain insights on account engagement or can fetch granular reports on extended query sessions.
- Governance teams can swiftly pinpoint non-standard joins and circulate alerts.
- Engineers can implement regular data quality checks.
- Adaptability is a highlight. Beyond emails, it’s feasible to adapt this model for tasks such as curtailing extensive queries or orchestrating data purges.
By blending Snowflake's innate potential with a thoughtfully structured solution, data maestros can helm an enhanced command over their data sphere.