SQL Queries
Important Capabilities
Capability | Status | Notes |
---|---|---|
Column-level Lineage | ✅ | Parsed from SQL queries |
Table-Level Lineage | ✅ | Parsed from SQL queries |
This source reads a newline-delimited JSON file containing SQL queries and parses them to generate lineage.
Query File Format
This file should contain one JSON object per line, with the following fields:
- query: string - The SQL query to parse.
- timestamp (optional): number - The timestamp of the query, in seconds since the epoch.
- user (optional): string - The user who ran the query. This user value will be directly converted into a DataHub user urn.
- operation_type (optional): string - Platform-specific operation type, used if the operation type can't be parsed.
- downstream_tables (optional): string[] - Fallback list of tables that the query writes to, used if the query can't be parsed.
- upstream_tables (optional): string[] - Fallback list of tables the query reads from, used if the query can't be parsed.
Example Queries File
{"query": "SELECT x FROM my_table", "timestamp": 1689232738.051, "user": "user_a", "downstream_tables": [], "upstream_tables": ["my_database.my_schema.my_table"]}
{"query": "INSERT INTO my_table VALUES (1, 'a')", "timestamp": 1689232737.669, "user": "user_b", "downstream_tables": ["my_database.my_schema.my_table"], "upstream_tables": []}
Note that this file does not represent a single JSON object, but instead newline-delimited JSON, in which each line is a separate JSON object.
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
datahub_api: # Only necessary if using a non-DataHub sink, e.g. the file sink
server: http://localhost:8080
timeout_sec: 60
source:
type: sql-queries
config:
platform: "snowflake"
default_db: "SNOWFLAKE"
query_file: "./queries.json"
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
platform ✅ string | The platform for which to generate data, e.g. snowflake |
query_file ✅ string | Path to file to ingest |
default_db string | The default database to use for unqualified table names |
default_dialect string | The SQL dialect to use when parsing queries. Overrides automatic dialect detection. |
default_schema string | The default schema to use for unqualified table names |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details. |
env string | The environment that all assets produced by this connector belong to Default: PROD |
usage BaseUsageConfig | The usage config to use when generating usage statistics Default: {'bucket_duration': 'DAY', 'end_time': '2025-01-10... |
usage.bucket_duration Enum | Size of the time window to aggregate usage stats. Default: DAY |
usage.end_time string(date-time) | Latest date of lineage/usage to consider. Default: Current time in UTC |
usage.format_sql_queries boolean | Whether to format sql queries Default: False |
usage.include_operational_stats boolean | Whether to display operational stats. Default: True |
usage.include_read_operational_stats boolean | Whether to report read operational stats. Experimental. Default: False |
usage.include_top_n_queries boolean | Whether to ingest the top_n_queries. Default: True |
usage.start_time string(date-time) | Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration ). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'. |
usage.top_n_queries integer | Number of top queries to save to each table. Default: 10 |
usage.user_email_pattern AllowDenyPattern | regex patterns for user emails to filter in usage. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
usage.user_email_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
usage.user_email_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
usage.user_email_pattern.allow.string string | |
usage.user_email_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
usage.user_email_pattern.deny.string string |
The JSONSchema for this configuration is inlined below.
{
"title": "SqlQueriesSourceConfig",
"description": "Any source that connects to a platform should inherit this class",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details.",
"type": "string"
},
"query_file": {
"title": "Query File",
"description": "Path to file to ingest",
"type": "string"
},
"platform": {
"title": "Platform",
"description": "The platform for which to generate data, e.g. snowflake",
"type": "string"
},
"usage": {
"title": "Usage",
"description": "The usage config to use when generating usage statistics",
"default": {
"bucket_duration": "DAY",
"end_time": "2025-01-10T22:33:33.706085+00:00",
"start_time": "2025-01-09T00:00:00+00:00",
"queries_character_limit": 24000,
"top_n_queries": 10,
"user_email_pattern": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"include_operational_stats": true,
"include_read_operational_stats": false,
"format_sql_queries": false,
"include_top_n_queries": true
},
"allOf": [
{
"$ref": "#/definitions/BaseUsageConfig"
}
]
},
"default_db": {
"title": "Default Db",
"description": "The default database to use for unqualified table names",
"type": "string"
},
"default_schema": {
"title": "Default Schema",
"description": "The default schema to use for unqualified table names",
"type": "string"
},
"default_dialect": {
"title": "Default Dialect",
"description": "The SQL dialect to use when parsing queries. Overrides automatic dialect detection.",
"type": "string"
}
},
"required": [
"query_file",
"platform"
],
"additionalProperties": false,
"definitions": {
"BucketDuration": {
"title": "BucketDuration",
"description": "An enumeration.",
"enum": [
"DAY",
"HOUR"
],
"type": "string"
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"BaseUsageConfig": {
"title": "BaseUsageConfig",
"type": "object",
"properties": {
"bucket_duration": {
"description": "Size of the time window to aggregate usage stats.",
"default": "DAY",
"allOf": [
{
"$ref": "#/definitions/BucketDuration"
}
]
},
"end_time": {
"title": "End Time",
"description": "Latest date of lineage/usage to consider. Default: Current time in UTC",
"type": "string",
"format": "date-time"
},
"start_time": {
"title": "Start Time",
"description": "Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.",
"type": "string",
"format": "date-time"
},
"top_n_queries": {
"title": "Top N Queries",
"description": "Number of top queries to save to each table.",
"default": 10,
"exclusiveMinimum": 0,
"type": "integer"
},
"user_email_pattern": {
"title": "User Email Pattern",
"description": "regex patterns for user emails to filter in usage.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"include_operational_stats": {
"title": "Include Operational Stats",
"description": "Whether to display operational stats.",
"default": true,
"type": "boolean"
},
"include_read_operational_stats": {
"title": "Include Read Operational Stats",
"description": "Whether to report read operational stats. Experimental.",
"default": false,
"type": "boolean"
},
"format_sql_queries": {
"title": "Format Sql Queries",
"description": "Whether to format sql queries",
"default": false,
"type": "boolean"
},
"include_top_n_queries": {
"title": "Include Top N Queries",
"description": "Whether to ingest the top_n_queries.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.sql_queries.SqlQueriesSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for SQL Queries, feel free to ping us on our Slack.