Pulsar
Integration Details
The Datahub Pulsar source plugin extracts topic
and schema
metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the Pulsar admin Rest API interface to interact with the Pulsar instance. The following APIs are used in order to:
- Get the list of existing tenants
- Get the list of namespaces associated with each tenant
- Get the list of topics associated with each namespace
- persistent topics
- persistent partitioned topics
- non-persistent topics
- non-persistent partitioned topics
- Get the latest schema associated with each topic
The data is extracted on tenant
and namespace
basis, topics with corresponding schema (if available) are ingested as Dataset into Datahub. Some additional values like schema description
, schema_version
, schema_type
and partitioned
are included as DatasetProperties
.
Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
Source Concept | DataHub Concept | Notes |
---|---|---|
pulsar | Data Platform | |
Pulsar Topic | Dataset | subType: topic |
Pulsar Schema | SchemaField | Maps to the fields defined within the Avro or JSON schema definition. |
Metadata Ingestion Quickstart
For context on getting started with ingestion, check out our metadata ingestion guide.
Important Capabilities
Capability | Status | Notes |
---|---|---|
Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata |
Domains | ✅ | Supported via the domain config field |
Platform Instance | ✅ | Enabled by default |
Schema Metadata | ✅ | Enabled by default |
PulsarSource(config: datahub.ingestion.source_config.pulsar.PulsarSourceConfig, ctx: datahub.ingestion.api.common.PipelineContext)
NOTE: Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}).
Prerequisites
In order to ingest metadata from Apache Pulsar, you will need:
- Access to a Pulsar Instance, if authentication is enabled a valid access token.
- Pulsar version >= 2.7.0
NOTE: A superUser role is required for listing all existing tenants within a Pulsar instance.
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: "pulsar"
config:
env: "TEST"
platform_instance: "local"
## Pulsar client connection config ##
web_service_url: "https://localhost:8443"
verify_ssl: "/opt/certs/ca.cert.pem"
# Issuer url for auth document, for example "http://localhost:8083/realms/pulsar"
issuer_url: <issuer_url>
client_id: ${CLIENT_ID}
client_secret: ${CLIENT_SECRET}
# Tenant list to scrape
tenants:
- tenant_1
- tenant_2
# Topic filter pattern
topic_patterns:
allow:
- ".*sales.*"
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
client_id string | The application's client ID |
client_secret string | The application's client secret |
exclude_individual_partitions boolean | Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets. Default: True |
issuer_url string | The complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication. |
oid_config object | Placeholder for OpenId discovery document |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details. |
timeout integer | Timout setting, how long to wait for the Pulsar rest api to send data before giving up Default: 5 |
token string | The access token for the application. Mandatory for token based authentication. |
verify_ssl One of boolean, string | Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Default: True |
web_service_url string | The web URL for the cluster. Default: http://localhost:8080 |
env string | The environment that all assets produced by this connector belong to Default: PROD |
domain map(str,AllowDenyPattern) | A class to store allow deny regexes |
domain. key .allowarray | List of regex patterns to include in ingestion Default: ['.*'] |
domain. key .allow.stringstring | |
domain. key .ignoreCaseboolean | Whether to ignore case sensitivity during pattern matching. Default: True |
domain. key .denyarray | List of regex patterns to exclude from ingestion. Default: [] |
domain. key .deny.stringstring | |
namespace_patterns AllowDenyPattern | List of regex patterns for namespaces to include/exclude from ingestion. By default the functions namespace is denied. Default: {'allow': ['.*'], 'deny': ['public/functions'], 'i... |
namespace_patterns.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
namespace_patterns.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
namespace_patterns.allow.string string | |
namespace_patterns.deny array | List of regex patterns to exclude from ingestion. Default: [] |
namespace_patterns.deny.string string | |
tenant_patterns AllowDenyPattern | List of regex patterns for tenants to include/exclude from ingestion. By default all tenants are allowed. Default: {'allow': ['.*'], 'deny': ['pulsar'], 'ignoreCase'... |
tenant_patterns.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
tenant_patterns.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
tenant_patterns.allow.string string | |
tenant_patterns.deny array | List of regex patterns to exclude from ingestion. Default: [] |
tenant_patterns.deny.string string | |
tenants array | Listing all tenants requires superUser role, alternative you can set a list of tenants you want to scrape using the tenant admin role Default: [] |
tenants.string string | |
topic_patterns AllowDenyPattern | List of regex patterns for topics to include/exclude from ingestion. By default the Pulsar system topics are denied. Default: {'allow': ['.*'], 'deny': ['/__.*$'], 'ignoreCase'... |
topic_patterns.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
topic_patterns.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
topic_patterns.allow.string string | |
topic_patterns.deny array | List of regex patterns to exclude from ingestion. Default: [] |
topic_patterns.deny.string string | |
stateful_ingestion StatefulStaleMetadataRemovalConfig | see Stateful Ingestion |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "PulsarSourceConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details.",
"type": "string"
},
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "see Stateful Ingestion",
"allOf": [
{
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
}
]
},
"web_service_url": {
"title": "Web Service Url",
"description": "The web URL for the cluster.",
"default": "http://localhost:8080",
"type": "string"
},
"timeout": {
"title": "Timeout",
"description": "Timout setting, how long to wait for the Pulsar rest api to send data before giving up",
"default": 5,
"type": "integer"
},
"issuer_url": {
"title": "Issuer Url",
"description": "The complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication.",
"type": "string"
},
"client_id": {
"title": "Client Id",
"description": "The application's client ID",
"type": "string"
},
"client_secret": {
"title": "Client Secret",
"description": "The application's client secret",
"type": "string"
},
"token": {
"title": "Token",
"description": "The access token for the application. Mandatory for token based authentication.",
"type": "string"
},
"verify_ssl": {
"title": "Verify Ssl",
"description": "Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.",
"default": true,
"anyOf": [
{
"type": "boolean"
},
{
"type": "string"
}
]
},
"tenant_patterns": {
"title": "Tenant Patterns",
"description": "List of regex patterns for tenants to include/exclude from ingestion. By default all tenants are allowed.",
"default": {
"allow": [
".*"
],
"deny": [
"pulsar"
],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"namespace_patterns": {
"title": "Namespace Patterns",
"description": "List of regex patterns for namespaces to include/exclude from ingestion. By default the functions namespace is denied.",
"default": {
"allow": [
".*"
],
"deny": [
"public/functions"
],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"topic_patterns": {
"title": "Topic Patterns",
"description": "List of regex patterns for topics to include/exclude from ingestion. By default the Pulsar system topics are denied.",
"default": {
"allow": [
".*"
],
"deny": [
"/__.*$"
],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"exclude_individual_partitions": {
"title": "Exclude Individual Partitions",
"description": "Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.",
"default": true,
"type": "boolean"
},
"tenants": {
"title": "Tenants",
"description": "Listing all tenants requires superUser role, alternative you can set a list of tenants you want to scrape using the tenant admin role",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"domain": {
"title": "Domain",
"description": "Domain patterns",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/AllowDenyPattern"
}
},
"oid_config": {
"title": "Oid Config",
"description": "Placeholder for OpenId discovery document",
"type": "object"
}
},
"additionalProperties": false,
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
"default": {},
"type": "object"
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.pulsar.PulsarSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Pulsar, feel free to ping us on our Slack.