Snowflake 成本优化:警惕那些 Rube Goldberg 式复杂查询 | 技术实践

来源: InfoQ - AI&大模型

原文

Rube Goldberg 机械是一种被刻意设计成通过迂回复杂方式完成简单任务的装置。

我认为自己可能首创了“Rube Goldberg 式查询”这一表述。实现相同查询结果有多种编写方式,而此类查询往往过度复杂、执行耗时极长,却能够通过重写轻松实现性能跃升。典型案例如:对同一超大表进行非必要的子查询、在相同大表上执行 EXCEPT 查询、对大量行调用函数而非使用简单内联代码或映射表,以及不必要的多字段分组。

Article content

近期我参与了多个讨论,期间常听到有人抱怨 Snowflake 成本高昂。

对此我持不同观点!

与所有按用量计费的工具一样,Snowflake 必须进行持续监控并实施适当管控机制。

我将跳过基础的成本优化建议,相信各位已熟知调整查询超时默认值、挂停虚拟仓库等技巧。目前已有 Revefi 等优秀工具能基于负载动态调整虚拟仓库规模,并通过生成式 AI 实时推送优化建议,用于用户培训与触达。在 Snowflake 成本优化中,最常被忽视的关键因素正是用户培训与触达——这项重要工作完全无需依赖第三方工具即可实施。

人为错误和“Rube Goldberg 式查询”通常会显著增加消耗成本。

在 Snowflake 中运行一次查询的成本是多少? 

Article content

这么说可能听起来像二手车销售员,但这确实要视情况而定。

当然,您可以粗略地按(分钟数/60)× 集群数量 ×(每小时信用值)× 单信用值成本来计算,但这仅计算了数据仓库处于闲置状态时的最坏情况成本。

查询的真实成本体现在其对数据仓库及其他并发查询运行的影响。若新增查询导致大型仓库需额外运行 10 分钟才暂停,则该查询的实际成本为(10/60)×1×8×4=5.33 美元。这听起来仍有些昂贵,但根据经验,多数查询可在数秒至一两分钟内完成,且仓库通常存在大量其他并发活动。

最坏成本模型就像为每个应用程序单独购置计算机,而非让所有程序在同一台计算机上协同运行。

虽然无法精确告知您查询的具体成本,但可以明确:缩短查询执行时间将直接节省开支。

当存在大量用户运行即席查询时,成本控制会变得尤为困难。这一点常被忽视,却是成本管理的关键环节。

如果您能近乎实时地识别慢查询呢?

如果您能自动向用户发送关于其慢查询的电子邮件,并附上查看查询详情的链接以及可按需取消查询的命令呢?

如果该电子邮件能向他们提供一个新的优化后查询以及对所做更改的解释呢?

如果您能在不购买任何新东西的情况下完成所有这些工作呢?

第一步是创建一个查询INFORMATION_SCHEMA.QUERY_HISTORY 的视图。

create or replace view LONG_RUNNING_QUERIES_VW(
REPORT_TIME,
WAREHOUSE_NAME,
WAREHOUSE_SIZE,
USER_NAME,
QUERY_ID,
RUNNING_MINUTES,
DATABASE_NAME,
SCHEMA_NAME,
EXECUTION_STATUS,
QUERY_TEXT
) as
SELECT
    CURRENT_TIMESTAMP() AS report_time,
    WAREHOUSE_NAME,
    WAREHOUSE_SIZE,
    USER_NAME,
    QUERY_ID,
    DATEDIFF(MINUTE, START_TIME::TIMESTAMP, CURRENT_TIMESTAMP::TIMESTAMP) AS running_minutes,
    DATABASE_NAME,
    SCHEMA_NAME,
    EXECUTION_STATUS,
    QUERY_TEXT
FROM
    TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
WHERE
    EXECUTION_STATUS = 'RUNNING'
    AND DATEDIFF(MINUTE, START_TIME::TIMESTAMP, CURRENT_TIMESTAMP::TIMESTAMP) > 29 //means 30 Minutes+
ORDER BY
    START_TIME DESC;
Article content

现在您有了一个视图,可以显示当前正在运行且运行时间超过您设定阈值的查询,接下来可以创建一个存储过程来生成一封HTML 电子邮件,这封邮件会发送给您在调用过程时传入的人员列表,以及当前有长时间运行查询的任何用户。

QUERY_ID 是一个超链接,可在 Snowflake 中打开该查询。

CREATE OR REPLACE PROCEDURE "EMAIL_LONG_RUNNING_QUERIES"("EMAIL_LIST" VARCHAR)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
COMMENT='This version will email everyone with a long running query as long as they have validated their email'
EXECUTE AS CALLER
AS '
  // Initialize emails array to store all recipients
  var emails = [];
  var manualEmails = [];
  var query = "USE SECONDARY ROLES ALL";
  var stmt = snowflake.createStatement({sqlText: query});
  var rs = stmt.execute();
  
  // Validate input parameter and add manual emails
  if (EMAIL_LIST && EMAIL_LIST.trim() !== "") {
    manualEmails = EMAIL_LIST.split(",");
    for (var i = 0; i < manualEmails.length; i++) {
      var email = manualEmails[i].trim();
      if (email) {
        emails.push(email);
      }
    }
  }
  
  // Get email addresses from query
  var query = `
    WITH cte1 AS (
      SELECT L.user_name, u.email, 
      l.warehouse_name, l.warehouse_size, l.query_id, l.running_minutes, l.database_name, l.schema_name,
      l.execution_status, query_text
      FROM core_lines_stg_dev.admin.long_running_queries_vw L
      INNER JOIN snowflake.account_usage.users U ON u.name = l.user_name
    ),
    cte2 AS (
      SELECT DISTINCT email FROM cte1 WHERE email IS NOT NULL
      AND email <> ''DANGREENB3RG@EMAIL.COM'' --excluding myself to avoid dupe email
    )
    SELECT * FROM cte2;
  `;
  
  try {
    var stmt = snowflake.createStatement({sqlText: query});
    var rs = stmt.execute();
    
    // Add query-based emails to the emails array
    while (rs.next()) {
      var queryEmail = rs.getColumnValue("EMAIL");
      if (queryEmail && !emails.includes(queryEmail.toUpperCase())) {
        emails.push(queryEmail.toUpperCase());
      }
    }
    
    // If no emails found, return an error
    if (emails.length === 0) {
      return "Error: No valid email addresses found from parameters or query";
    }
  } catch (err) {
    return "Error executing email query: " + err;
  }
  
  // Use secondary roles for access
  var useSecRoles = "USE SECONDARY ROLES ALL";
  var stmtSec = snowflake.createStatement({sqlText: useSecRoles});
  var rsSec = stmtSec.execute();
  
  // Get the long running queries data
  var dataQuery = "SELECT * FROM core_lines_stg_dev.admin.long_running_queries_vw";
  var dataStmt = snowflake.createStatement({sqlText: dataQuery});
  var dataRs = dataStmt.execute();
  
  // Create HTML email body
  var body = "";
  body += "

Long-Running Queries Report

"; body += "

The following queries have been running for more than 30 minutes:

"; // Create HTML table body += ""; body += ""; body += ""; body += ""; body += ""; body += ""; body += ""; body += ""; body += ""; body += ""; body += ""; // Add rows var count = 0; while (dataRs.next()) { count++; body += ""; // Warehouse name in bold body += ""; // Warehouse size in bold body += ""; body += ""; body += ""; // Running minutes in bold and red body += ""; body += ""; body += ""; body += ""; body += ""; } body += "
WarehouseSizeUserQuery IDMinutesDatabaseSchemaStatus
" + (dataRs.getColumnValue(''WAREHOUSE_NAME'') || '''') + "" + (dataRs.getColumnValue(''WAREHOUSE_SIZE'') || '''') + "" + (dataRs.getColumnValue(''USER_NAME'') || '''') + "" + "" + (dataRs.getColumnValue(''QUERY_ID'') || '''') + "" + (dataRs.getColumnValue(''RUNNING_MINUTES'') || '''') + "" + (dataRs.getColumnValue(''DATABASE_NAME'') || '''') + "" + (dataRs.getColumnValue(''SCHEMA_NAME'') || '''') + "" + (dataRs.getColumnValue(''EXECUTION_STATUS'') || '''') + "
"; // Add summary if (count === 0) { body += "

No long-running queries found at this time.

"; } else { body += "

" + count + " queries found. Click the Query ID to see the query in Snowflake.

"; } // Add timestamp var timestamp = new Date().toISOString(); body += "

Report generated at: " + timestamp + "

"; body += ""; // Create email subject var subject = "Snowflake Long-Running Queries - " + count + " found"; try { // Send the HTML email to each email in the combined list var successCount = 0; var errors = []; var emailsSent = []; for (var i = 0; i < emails.length; i++) { var email = emails[i].trim(); if (email && !emailsSent.includes(email.toUpperCase())) { try { var send_stmt = snowflake.createStatement({ sqlText: "CALL SYSTEM$SEND_EMAIL(?, ?, ?, ?, ''text/html'')", binds: [''email_integration'', email, subject, body] }); send_stmt.execute(); successCount++; emailsSent.push(email.toUpperCase()); // Track emails already sent to avoid duplicates } catch (err) { errors.push("Error sending to " + email + ": " + err); } } } if (errors.length > 0) { return "Emails sent: " + successCount + " of " + emails.length + ". Manual recipients: " + manualEmails.length + ". Query recipients: " + (emails.length - manualEmails.length) + ". Errors: " + errors.join("; "); } else { return "Email sent successfully to " + successCount + " unique recipients. Manual recipients: " + manualEmails.length + ". Query recipients: " + (emails.length - manualEmails.length) + ". " + count + " long-running queries reported."; } } catch (err) { return "Error sending email: " + err; } ';

如果您运行该存储过程,它将向您以及有长时间运行查询的用户发送电子邮件。

Article content

接下来创建任务或警报来调度存储过程的运行。我更推荐警报机制,因其可设置视图存在数据时才触发的条件,避免发送空邮件。请注意:无论使用任务还是警报,即使存储过程中已编码次要角色,任务和警报仍不支持次要角色。这意味着任务或警报的所有者必须具有查看视图结果的权限。

您可以通过以下查询进行测试:

USE ROLE ;
USE SECONDARY ROLES NONE;
SELECT * FROM LONG_RUNNING_QUERIES_VW;

如果返回结果,则可以使用该角色创建任务或警报。若未返回结果,则需检查角色层级关系。

这种方法虽然实用,但GenAI功能体现在何处?

我们可以将query_text通过SNOWFLAKE.CORTEX.COMPLETE传递并配合恰当的提示词,使其优化查询语句并解释改进点。

select snowflake.cortex.complete(    'mistral-large2',        'rewrite this Snowflake query for maximum performance, only show the new query and explain the improvements, format the results as html: ,&#039;) as result;

建议尝试不同的LLM模型与提示词组合。以下是该查询的实际输出示例:

Article content

您甚至可以修改存储过程,自动集成GenAI的优化建议。但即便不这样做,也能近乎实时地通知用户关注其需要优化的长时查询。