Cassandra
Important Capabilities
Capability | Status | Notes |
---|---|---|
Asset Containers | ✅ | Enabled by default |
Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata |
Platform Instance | ✅ | Enabled by default |
Schema Metadata | ✅ | Enabled by default |
This plugin extracts the following:
- Metadata for tables
- Column types associated with each table column
- The keyspace each table belongs to
Setup
This integration pulls metadata directly from Cassandra databases, including both DataStax Astra DB and Cassandra Enterprise Edition (EE).
You’ll need to have a Cassandra instance or an Astra DB setup with appropriate access permissions.
Steps to Get the Required Information
Set Up User Credentials:
- For Astra DB:
- Log in to your Astra DB Console.
- Navigate to Organization Settings > Token Management.
- Generate an Application Token with the required permissions for read access.
- Download the Secure Connect Bundle from the Astra DB Console.
- For Cassandra EE:
- Ensure you have a username and password with read access to the necessary keyspaces.
- For Astra DB:
Permissions:
- The user or token must have
SELECT
permissions that allow it to:- Access metadata in system keyspaces (e.g.,
system_schema
) to retrieve information about keyspaces, tables, columns, and views. - Perform
SELECT
operations on the data tables if data profiling is enabled.
- Access metadata in system keyspaces (e.g.,
- The user or token must have
Verify Database Access:
- For Astra DB: Ensure the Secure Connect Bundle is used and configured correctly.
- For Cassandra Opensource: Ensure the contact point and port are accessible.
When enabling profiling, make sure to set a limit on the number of rows to sample. Profiling large tables without a limit may lead to excessive resource consumption and slow performance.
For cloud configuration with Astra DB, it is necessary to specify the Secure Connect Bundle path in the configuration. For that reason, use the CLI to ingest metadata into DataHub.
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: "cassandra"
config:
# Credentials for on prem cassandra
contact_point: "localhost"
port: 9042
username: "admin"
password: "password"
# Or
# Credentials Astra Cloud
#cloud_config:
# secure_connect_bundle: "Path to Secure Connect Bundle (.zip)"
# token: "Application Token"
# Optional Allow / Deny extraction of particular keyspaces.
keyspace_pattern:
allow: [".*"]
# Optional Allow / Deny extraction of particular tables.
table_pattern:
allow: [".*"]
# Optional
profiling:
enabled: true
profile_table_level_only: true
sink:
# config sinks
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
contact_point string | Domain or IP address of the Cassandra instance (excluding port). Default: localhost |
password string | Password credential associated with the specified username. |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details. |
port integer | Port number to connect to the Cassandra instance. Default: 9042 |
username string | Username credential with read access to the system_schema keyspace. |
env string | The environment that all assets produced by this connector belong to Default: PROD |
cloud_config CassandraCloudConfig | Configuration for cloud-based Cassandra, such as DataStax Astra DB. |
cloud_config.secure_connect_bundle ❓ string | File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB. |
cloud_config.token ❓ string | The Astra DB application token used for authentication. |
cloud_config.connect_timeout integer | Timeout in seconds for establishing new connections to Cassandra. Default: 600 |
cloud_config.request_timeout integer | Timeout in seconds for individual Cassandra requests. Default: 600 |
keyspace_pattern AllowDenyPattern | Regex patterns to filter keyspaces for ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
keyspace_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
keyspace_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
keyspace_pattern.allow.string string | |
keyspace_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
keyspace_pattern.deny.string string | |
profile_pattern AllowDenyPattern | Regex patterns for tables to profile Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
profile_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
profile_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
profile_pattern.allow.string string | |
profile_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
profile_pattern.deny.string string | |
table_pattern AllowDenyPattern | Regex patterns to filter keyspaces.tables for ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
table_pattern.allow.string string | |
table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
table_pattern.deny.string string | |
profiling GEProfilingBaseConfig | Configuration for profiling Default: {'enabled': False, 'operation_config': {'lower_fre... |
profiling.enabled boolean | Whether profiling should be done. Default: False |
profiling.include_field_distinct_count boolean | Whether to profile for the number of distinct values for each column. Default: True |
profiling.include_field_distinct_value_frequencies boolean | Whether to profile for distinct value frequencies. Default: False |
profiling.include_field_histogram boolean | Whether to profile for the histogram for numeric fields. Default: False |
profiling.include_field_max_value boolean | Whether to profile for the max value of numeric columns. Default: True |
profiling.include_field_mean_value boolean | Whether to profile for the mean value of numeric columns. Default: True |
profiling.include_field_median_value boolean | Whether to profile for the median value of numeric columns. Default: True |
profiling.include_field_min_value boolean | Whether to profile for the min value of numeric columns. Default: True |
profiling.include_field_null_count boolean | Whether to profile for the number of nulls for each column. Default: True |
profiling.include_field_quantiles boolean | Whether to profile for the quantiles of numeric columns. Default: False |
profiling.include_field_sample_values boolean | Whether to profile for the sample values for all columns. Default: True |
profiling.include_field_stddev_value boolean | Whether to profile for the standard deviation of numeric columns. Default: True |
profiling.limit integer | Max number of documents to profile. By default, profiles all documents. |
profiling.max_workers integer | Number of worker threads to use for profiling. Set to 1 to disable. Default: 20 |
profiling.offset integer | Offset in documents to profile. By default, uses no offset. |
profiling.profile_table_level_only boolean | Whether to perform profiling at table-level only, or include column-level profiling as well. Default: False |
profiling.operation_config OperationConfig | Experimental feature. To specify operation configs. |
profiling.operation_config.lower_freq_profile_enabled boolean | Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling. Default: False |
profiling.operation_config.profile_date_of_month integer | Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect. |
profiling.operation_config.profile_day_of_week integer | Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect. |
stateful_ingestion StatefulStaleMetadataRemovalConfig | Configuration for stateful ingestion and stale metadata removal. |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "CassandraSourceConfig",
"description": "Configuration for connecting to a Cassandra or DataStax Astra DB source.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "Configuration for stateful ingestion and stale metadata removal.",
"allOf": [
{
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
}
]
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details.",
"type": "string"
},
"contact_point": {
"title": "Contact Point",
"description": "Domain or IP address of the Cassandra instance (excluding port).",
"default": "localhost",
"type": "string"
},
"port": {
"title": "Port",
"description": "Port number to connect to the Cassandra instance.",
"default": 9042,
"type": "integer"
},
"username": {
"title": "Username",
"description": "Username credential with read access to the system_schema keyspace.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Password credential associated with the specified username.",
"type": "string"
},
"cloud_config": {
"title": "Cloud Config",
"description": "Configuration for cloud-based Cassandra, such as DataStax Astra DB.",
"allOf": [
{
"$ref": "#/definitions/CassandraCloudConfig"
}
]
},
"keyspace_pattern": {
"title": "Keyspace Pattern",
"description": "Regex patterns to filter keyspaces for ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns to filter keyspaces.tables for ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"profile_pattern": {
"title": "Profile Pattern",
"description": "Regex patterns for tables to profile",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"profiling": {
"title": "Profiling",
"description": "Configuration for profiling",
"default": {
"enabled": false,
"operation_config": {
"lower_freq_profile_enabled": false,
"profile_day_of_week": null,
"profile_date_of_month": null
},
"limit": null,
"offset": null,
"profile_table_level_only": false,
"include_field_null_count": true,
"include_field_distinct_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"include_field_mean_value": true,
"include_field_median_value": true,
"include_field_stddev_value": true,
"include_field_quantiles": false,
"include_field_distinct_value_frequencies": false,
"include_field_histogram": false,
"include_field_sample_values": true,
"max_workers": 20
},
"allOf": [
{
"$ref": "#/definitions/GEProfilingBaseConfig"
}
]
}
},
"additionalProperties": false,
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
"default": {},
"type": "object"
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"CassandraCloudConfig": {
"title": "CassandraCloudConfig",
"description": "Configuration for connecting to DataStax Astra DB in the cloud.",
"type": "object",
"properties": {
"token": {
"title": "Token",
"description": "The Astra DB application token used for authentication.",
"type": "string"
},
"secure_connect_bundle": {
"title": "Secure Connect Bundle",
"description": "File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB.",
"type": "string"
},
"connect_timeout": {
"title": "Connect Timeout",
"description": "Timeout in seconds for establishing new connections to Cassandra.",
"default": 600,
"type": "integer"
},
"request_timeout": {
"title": "Request Timeout",
"description": "Timeout in seconds for individual Cassandra requests.",
"default": 600,
"type": "integer"
}
},
"required": [
"token",
"secure_connect_bundle"
],
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"OperationConfig": {
"title": "OperationConfig",
"type": "object",
"properties": {
"lower_freq_profile_enabled": {
"title": "Lower Freq Profile Enabled",
"description": "Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling.",
"default": false,
"type": "boolean"
},
"profile_day_of_week": {
"title": "Profile Day Of Week",
"description": "Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
},
"profile_date_of_month": {
"title": "Profile Date Of Month",
"description": "Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
}
},
"additionalProperties": false
},
"GEProfilingBaseConfig": {
"title": "GEProfilingBaseConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"operation_config": {
"title": "Operation Config",
"description": "Experimental feature. To specify operation configs.",
"allOf": [
{
"$ref": "#/definitions/OperationConfig"
}
]
},
"limit": {
"title": "Limit",
"description": "Max number of documents to profile. By default, profiles all documents.",
"type": "integer"
},
"offset": {
"title": "Offset",
"description": "Offset in documents to profile. By default, uses no offset.",
"type": "integer"
},
"profile_table_level_only": {
"title": "Profile Table Level Only",
"description": "Whether to perform profiling at table-level only, or include column-level profiling as well.",
"default": false,
"type": "boolean"
},
"include_field_null_count": {
"title": "Include Field Null Count",
"description": "Whether to profile for the number of nulls for each column.",
"default": true,
"type": "boolean"
},
"include_field_distinct_count": {
"title": "Include Field Distinct Count",
"description": "Whether to profile for the number of distinct values for each column.",
"default": true,
"type": "boolean"
},
"include_field_min_value": {
"title": "Include Field Min Value",
"description": "Whether to profile for the min value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_max_value": {
"title": "Include Field Max Value",
"description": "Whether to profile for the max value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_mean_value": {
"title": "Include Field Mean Value",
"description": "Whether to profile for the mean value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_median_value": {
"title": "Include Field Median Value",
"description": "Whether to profile for the median value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_stddev_value": {
"title": "Include Field Stddev Value",
"description": "Whether to profile for the standard deviation of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_quantiles": {
"title": "Include Field Quantiles",
"description": "Whether to profile for the quantiles of numeric columns.",
"default": false,
"type": "boolean"
},
"include_field_distinct_value_frequencies": {
"title": "Include Field Distinct Value Frequencies",
"description": "Whether to profile for distinct value frequencies.",
"default": false,
"type": "boolean"
},
"include_field_histogram": {
"title": "Include Field Histogram",
"description": "Whether to profile for the histogram for numeric fields.",
"default": false,
"type": "boolean"
},
"include_field_sample_values": {
"title": "Include Field Sample Values",
"description": "Whether to profile for the sample values for all columns.",
"default": true,
"type": "boolean"
},
"max_workers": {
"title": "Max Workers",
"description": "Number of worker threads to use for profiling. Set to 1 to disable.",
"default": 20,
"type": "integer"
}
},
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.cassandra.cassandra.CassandraSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Cassandra, feel free to ping us on our Slack.