Skip to main content
Version: Next

Kafka

Extract Topics & Schemas from Apache Kafka or Confluent Cloud. Certified

Important Capabilities

CapabilityStatusNotes
DescriptionsSet dataset description to top level doc field for Avro schema
Detect Deleted EntitiesOptionally enabled via stateful_ingestion.remove_stale_metadata
Platform InstanceFor multiple Kafka clusters, use the platform_instance configuration
Schema MetadataSchemas associated with each topic are extracted from the schema registry. Avro and Protobuf (certified), JSON (incubating). Schema references are supported.

This plugin extracts the following:

  • Topics from the Kafka broker
  • Schemas associated with each topic from the schema registry (Avro, Protobuf and JSON schemas are supported)

CLI based Ingestion

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "kafka"
config:
platform_instance: "YOUR_CLUSTER_ID"
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081

sink:
# sink configs


Config Details

Note that a . is used to denote nested fields in the YAML recipe.

FieldDescription
convert_urns_to_lowercase
boolean
Whether to convert dataset urns to lowercase.
Default: False
disable_topic_record_naming_strategy
boolean
Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Default: False
enable_meta_mapping
boolean
When enabled, applies the mappings that are defined through the meta_mapping directives.
Default: True
field_meta_mapping
object
mapping rules that will be executed against field-level schema properties. Refer to the section below on meta automated mappings.
Default: {}
ignore_warnings_on_schema_type
boolean
Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.
Default: False
ingest_schemas_as_entities
boolean
Enables ingesting schemas from schema registry as separate entities, in addition to the topics
Default: False
meta_mapping
object
mapping rules that will be executed against top-level schema properties. Refer to the section below on meta automated mappings.
Default: {}
platform_instance
string
The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details.
schema_registry_class
string
The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.
Default: datahub.ingestion.source.confluent_schema_registry...
schema_tags_field
string
The field name in the schema metadata that contains the tags to be added to the dataset.
Default: tags
strip_user_ids_from_email
boolean
Whether or not to strip email id while adding owners using meta mappings.
Default: False
tag_prefix
string
Prefix added to tags during ingestion.
Default:
topic_subject_map
map(str,string)
env
string
The environment that all assets produced by this connector belong to
Default: PROD
connection
KafkaConsumerConnectionConfig
Default: {'bootstrap': 'localhost:9092', 'schema_registry_u...
connection.bootstrap
string
Default: localhost:9092
connection.client_timeout_seconds
integer
The request timeout used when interacting with the Kafka APIs.
Default: 60
connection.consumer_config
object
Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .
connection.schema_registry_config
object
Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient
connection.schema_registry_url
string
Default: http://localhost:8080/schema-registry/api/
domain
map(str,AllowDenyPattern)
A class to store allow deny regexes
domain.key.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
domain.key.allow.string
string
domain.key.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
domain.key.deny
array
List of regex patterns to exclude from ingestion.
Default: []
domain.key.deny.string
string
topic_patterns
AllowDenyPattern
Default: {'allow': ['.*'], 'deny': ['^_.*'], 'ignoreCase': ...
topic_patterns.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
topic_patterns.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
topic_patterns.allow.string
string
topic_patterns.deny
array
List of regex patterns to exclude from ingestion.
Default: []
topic_patterns.deny.string
string
stateful_ingestion
StatefulStaleMetadataRemovalConfig
Base specialized config for Stateful Ingestion with stale metadata removal capability.
stateful_ingestion.enabled
boolean
Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False
Default: False
stateful_ingestion.remove_stale_metadata
boolean
Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.
Default: True
note

Stateful Ingestion is available only when a Platform Instance is assigned to this source.

Connecting to Confluent Cloud

If using Confluent Cloud you can use a recipe like this. In this consumer_config.sasl.username and consumer_config.sasl.password are the API credentials that you get (in the Confluent UI) from your cluster -> Data Integration -> API Keys. schema_registry_config.basic.auth.user.info has API credentials for Confluent schema registry which you get (in Confluent UI) from Schema Registry -> API credentials.

When creating API Key for the cluster ensure that the ACLs associated with the key are set like below. This is required for DataHub to read topic metadata from topics in Confluent Cloud.

Topic Name = *
Permission = ALLOW
Operation = DESCRIBE
Pattern Type = LITERAL
source:
type: "kafka"
config:
platform_instance: "YOUR_CLUSTER_ID"
connection:
bootstrap: "abc-defg.eu-west-1.aws.confluent.cloud:9092"
consumer_config:
security.protocol: "SASL_SSL"
sasl.mechanism: "PLAIN"
sasl.username: "${CLUSTER_API_KEY_ID}"
sasl.password: "${CLUSTER_API_KEY_SECRET}"
schema_registry_url: "https://abc-defgh.us-east-2.aws.confluent.cloud"
schema_registry_config:
basic.auth.user.info: "${REGISTRY_API_KEY_ID}:${REGISTRY_API_KEY_SECRET}"

sink:
# sink configs

If you are trying to add domains to your topics you can use a configuration like below.

source:
type: "kafka"
config:
# ...connection block
domain:
"urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810":
allow:
- ".*"
"urn:li:domain:d6ec9868-6736-4b1f-8aa6-fee4c5948f17":
deny:
- ".*"

Note that the domain in config above can be either an urn or a domain id (i.e. urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810 or simply 13ae4d85-d955-49fc-8474-9004c663a810). The Domain should exist in your DataHub instance before ingesting data into the Domain. To create a Domain on DataHub, check out the Domains User Guide.

If you are using a non-default subject naming strategy in the schema registry, such as RecordNameStrategy, the mapping for the topic's key and value schemas to the schema registry subject names should be provided via topic_subject_map as shown in the configuration below.

source:
type: "kafka"
config:
# ...connection block
# Defines the mapping for the key & value schemas associated with a topic & the subject name registered with the
# kafka schema registry.
topic_subject_map:
# Defines both key & value schema for topic 'my_topic_1'
"my_topic_1-key": "io.acryl.Schema1"
"my_topic_1-value": "io.acryl.Schema2"
# Defines only the value schema for topic 'my_topic_2' (the topic doesn't have a key schema).
"my_topic_2-value": "io.acryl.Schema3"

Custom Schema Registry

The Kafka Source uses the schema registry to figure out the schema associated with both key and value for the topic. By default it uses the Confluent's Kafka Schema registry and supports the AVRO and PROTOBUF schema types.

If you're using a custom schema registry, or you are using schema type other than AVRO or PROTOBUF, then you can provide your own custom implementation of the KafkaSchemaRegistryBase class, and implement the get_schema_metadata(topic, platform_urn) method that given a topic name would return object of SchemaMetadata containing schema for that topic. Please refer datahub.ingestion.source.confluent_schema_registry::ConfluentSchemaRegistry for sample implementation of this class.

class KafkaSchemaRegistryBase(ABC):
@abstractmethod
def get_schema_metadata(
self, topic: str, platform_urn: str
) -> Optional[SchemaMetadata]:
pass

The custom schema registry class can be configured using the schema_registry_class config param of the kafka source as shown below.

source:
type: "kafka"
config:
# Set the custom schema registry implementation class
schema_registry_class: "datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
# Coordinates
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081

OAuth Callback

The OAuth callback function can be set up using config.connection.consumer_config.oauth_cb.

You need to specify a Python function reference in the format <python-module>:<function-name>.

For example, in the configuration oauth:create_token, create_token is a function defined in oauth.py, and oauth.py must be accessible in the PYTHONPATH.

source:
type: "kafka"
config:
# Set the custom schema registry implementation class
schema_registry_class: "datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
# Coordinates
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081
consumer_config:
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "OAUTHBEARER"
oauth_cb: "oauth:create_token"
# sink configs

Limitations of PROTOBUF schema types implementation

The current implementation of the support for PROTOBUF schema type has the following limitations:

  • Recursive types are not supported.
  • If the schemas of different topics define a type in the same package, the source would raise an exception.

In addition to this, maps are represented as arrays of messages. The following message,

message MessageWithMap {
map<int, string> map_1 = 1;
}

becomes:

message Map1Entry {
int key = 1;
string value = 2/
}
message MessageWithMap {
repeated Map1Entry map_1 = 1;
}

Enriching DataHub metadata with automated meta mapping

note

Meta mapping is currently only available for Avro schemas, and requires that those Avro schemas are pushed to the schema registry.

Avro schemas are permitted to have additional attributes not defined by the specification as arbitrary metadata. A common pattern is to utilize this for business metadata. The Kafka source has the ability to transform this directly into DataHub Owners, Tags and Terms.

Simple tags

If you simply have a list of tags embedded into an Avro schema (either at the top-level or for an individual field), you can use the schema_tags_field config.

Example Avro schema:

{
"name": "sampleRecord",
"type": "record",
"tags": ["tag1", "tag2"],
"fields": [
{
"name": "field_1",
"type": "string",
"tags": ["tag3", "tag4"]
}
]
}

The name of the field containing a list of tags can be configured with the schema_tags_field property:

config:
schema_tags_field: tags

meta mapping

You can also map specific Avro fields into Owners, Tags and Terms using meta mapping.

Example Avro schema:

{
"name": "sampleRecord",
"type": "record",
"owning_team": "@Data-Science",
"data_tier": "Bronze",
"fields": [
{
"name": "field_1",
"type": "string",
"gdpr": {
"pii": true
}
}
]
}

This can be mapped to DataHub metadata with meta_mapping config:

config:
meta_mapping:
owning_team:
match: "^@(.*)"
operation: "add_owner"
config:
owner_type: group
data_tier:
match: "Bronze|Silver|Gold"
operation: "add_term"
config:
term: "{{ $match }}"
field_meta_mapping:
gdpr.pii:
match: true
operation: "add_tag"
config:
tag: "pii"

The underlying implementation is similar to dbt meta mapping, which has more detailed examples that can be used for reference.

Code Coordinates

  • Class Name: datahub.ingestion.source.kafka.kafka.KafkaSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Kafka, feel free to ping us on our Slack.