Adding Extractors
LineageBridge is designed to be extensible. This guide shows you how to add new extractors for Confluent Cloud services or new catalog providers for data warehouses.
Overview
LineageBridge has two main extension points:
- LineageExtractor - For extracting lineage from Confluent Cloud services (Kafka, ksqlDB, Flink, etc.)
- CatalogProvider - For integrating with data catalogs (Unity Catalog, AWS Glue, BigQuery, etc.)
Adding a Confluent Service Extractor
The LineageExtractor Protocol
All extractors implement the LineageExtractor protocol defined in lineage_bridge/clients/protocol.py:
from typing import Protocol
from lineage_bridge.models.graph import LineageNode, LineageEdge
class LineageExtractor(Protocol):
"""Interface for API-specific lineage extractors."""
async def extract(self) -> tuple[list[LineageNode], list[LineageEdge]]:
"""Extract nodes and edges from this API surface.
Returns:
A tuple of (nodes, edges) discovered by this extractor.
"""
...
That's it! Implement one async method that returns nodes and edges.
Step 1: Create the Client Module
Create a new file in lineage_bridge/clients/ following the naming pattern <service>_client.py:
# lineage_bridge/clients/my_service.py
"""MyService client - extracts lineage from Confluent MyService API."""
from __future__ import annotations
import logging
from typing import Any
from lineage_bridge.clients.base import ConfluentClient
from lineage_bridge.models.graph import (
EdgeType,
LineageEdge,
LineageNode,
NodeType,
SystemType,
)
logger = logging.getLogger(__name__)
class MyServiceClient(ConfluentClient):
"""Extracts lineage from Confluent MyService API."""
def __init__(
self,
base_url: str,
api_key: str,
api_secret: str,
cluster_id: str,
environment_id: str,
*,
timeout: float = 30.0,
) -> None:
super().__init__(base_url, api_key, api_secret, timeout=timeout)
self.cluster_id = cluster_id
self.environment_id = environment_id
async def extract(self) -> tuple[list[LineageNode], list[LineageEdge]]:
"""Extract nodes and edges from MyService API."""
nodes: list[LineageNode] = []
edges: list[LineageEdge] = []
# 1. Fetch resources from API
resources = await self._list_resources()
# 2. Build nodes
for resource in resources:
node = LineageNode(
node_id=self._build_node_id(resource["id"]),
system=SystemType.CONFLUENT,
node_type=NodeType.KAFKA_TOPIC, # Or your custom type
qualified_name=resource["name"],
display_name=resource["name"],
environment_id=self.environment_id,
cluster_id=self.cluster_id,
url=self._build_url(resource["id"]),
attributes={
"status": resource.get("status"),
"created_at": resource.get("created_at"),
},
)
nodes.append(node)
# 3. Build edges (e.g., from source/sink relationships)
for resource in resources:
if source_id := resource.get("source_topic"):
edge = LineageEdge(
src_id=self._build_topic_id(source_id),
dst_id=self._build_node_id(resource["id"]),
edge_type=EdgeType.CONSUMES,
)
edges.append(edge)
logger.info(
"MyService extracted %d nodes, %d edges from cluster %s",
len(nodes),
len(edges),
self.cluster_id,
)
return nodes, edges
# Helper methods
def _build_node_id(self, resource_id: str) -> str:
return f"confluent:my_service:{self.environment_id}:{resource_id}"
def _build_topic_id(self, topic_name: str) -> str:
return f"confluent:kafka_topic:{self.environment_id}:{topic_name}"
def _build_url(self, resource_id: str) -> str:
return (
f"https://confluent.cloud/environments/{self.environment_id}"
f"/clusters/{self.cluster_id}/my-service/{resource_id}"
)
# API methods
async def _list_resources(self) -> list[dict[str, Any]]:
"""Fetch resources from MyService API."""
path = f"/my-service/v1/clusters/{self.cluster_id}/resources"
return await self.paginate(path)
Step 2: Follow the KafkaAdminClient Pattern
Use lineage_bridge/clients/kafka_admin.py as a reference. It demonstrates:
- Extending
ConfluentClientfor automatic retry and pagination - Building node IDs in the format
{system}:{type}:{env_id}:{qualified_name} - Using helper methods for ID and URL construction
- Proper logging and error handling
- Handling pagination with
self.paginate(path)
Key patterns from KafkaAdminClient:
# Node ID format
def _topic_node_id(self, topic_name: str) -> str:
return f"confluent:kafka_topic:{self.environment_id}:{topic_name}"
# URL construction
def _topic_url(self, topic_name: str) -> str:
return (
f"https://confluent.cloud/environments/{self.environment_id}"
f"/clusters/{self.cluster_id}/topics/{topic_name}"
)
# Pagination
topics = await self._list_topics()
async def _list_topics(self) -> list[dict[str, Any]]:
path = f"/kafka/v3/clusters/{self.cluster_id}/topics"
return await self.paginate(path)
Step 3: Register in the Orchestrator
Add your extractor to lineage_bridge/extractors/orchestrator.py:
from lineage_bridge.clients.my_service import MyServiceClient
# In the extraction phase where it belongs
async def _extract_my_service(self) -> None:
"""Extract lineage from MyService."""
client = MyServiceClient(
base_url=self.settings.cloud_base_url,
api_key=self.settings.cloud_api_key,
api_secret=self.settings.cloud_api_secret.get_secret_value(),
cluster_id=self.cluster_id,
environment_id=self.environment_id,
)
nodes, edges = await client.extract()
for node in nodes:
self.graph.add_node(node)
for edge in edges:
self.graph.add_edge(edge)
Step 4: Write Tests
Create tests/unit/test_my_service.py:
import httpx
import pytest
import respx
from lineage_bridge.clients.my_service import MyServiceClient
from lineage_bridge.models.graph import NodeType
@pytest.fixture()
def my_service_client():
return MyServiceClient(
base_url="https://api.confluent.cloud",
api_key="test-key",
api_secret="test-secret",
cluster_id="lkc-123",
environment_id="env-abc",
)
@respx.mock
async def test_extract_resources(my_service_client):
"""Test MyServiceClient extracts resources."""
# Mock API response
respx.get("https://api.confluent.cloud/my-service/v1/clusters/lkc-123/resources").mock(
return_value=httpx.Response(
200,
json={
"data": [
{
"id": "res-1",
"name": "my-resource",
"status": "ACTIVE",
"source_topic": "input-topic",
}
],
"metadata": {"next": None}
}
)
)
nodes, edges = await my_service_client.extract()
# Assertions
assert len(nodes) == 1
assert nodes[0].display_name == "my-resource"
assert nodes[0].node_type == NodeType.MY_SERVICE_RESOURCE
assert len(edges) == 1
assert edges[0].edge_type == EdgeType.CONSUMES
See Testing Guide for more details on writing tests.
Adding a Catalog Provider
The CatalogProvider Protocol
Catalog providers implement the CatalogProvider protocol defined in lineage_bridge/catalogs/protocol.py:
from typing import Protocol, Any
from lineage_bridge.models.graph import (
LineageNode,
LineageEdge,
LineageGraph,
NodeType,
SystemType,
)
class CatalogProvider(Protocol):
"""Interface for data catalog integrations."""
catalog_type: str # e.g., "UNITY_CATALOG", "AWS_GLUE"
node_type: NodeType # The NodeType this provider creates
system_type: SystemType # The SystemType for nodes
def build_node(
self,
ci_config: dict[str, Any],
tableflow_node_id: str,
topic_name: str,
cluster_id: str,
environment_id: str,
) -> tuple[LineageNode, LineageEdge]:
"""Create a catalog node + MATERIALIZES edge."""
...
async def enrich(self, graph: LineageGraph) -> None:
"""Enrich catalog nodes with metadata from the catalog's API."""
...
def build_url(self, node: LineageNode) -> str | None:
"""Build a deep link URL to this node in the catalog's UI."""
...
Step 1: Create the Provider Module
Create lineage_bridge/catalogs/my_catalog.py:
# lineage_bridge/catalogs/my_catalog.py
"""MyCatalog provider - integrates with MyCatalog API."""
from __future__ import annotations
import logging
from typing import Any
import httpx
from lineage_bridge.models.graph import (
EdgeType,
LineageEdge,
LineageGraph,
LineageNode,
NodeType,
SystemType,
)
logger = logging.getLogger(__name__)
class MyCatalogProvider:
"""CatalogProvider implementation for MyCatalog."""
catalog_type: str = "MY_CATALOG"
node_type: NodeType = NodeType.MY_CATALOG_TABLE
system_type: SystemType = SystemType.EXTERNAL # Or create a new one
def __init__(
self,
api_url: str | None = None,
api_token: str | None = None,
) -> None:
self._api_url = api_url.rstrip("/") if api_url else None
self._api_token = api_token
def build_node(
self,
ci_config: dict[str, Any],
tableflow_node_id: str,
topic_name: str,
cluster_id: str,
environment_id: str,
) -> tuple[LineageNode, LineageEdge]:
"""Create a catalog table node + MATERIALIZES edge."""
# Extract catalog config
catalog_config = ci_config.get("my_catalog", {})
database = catalog_config.get("database", "default")
schema = catalog_config.get("schema", cluster_id)
# Build qualified name (catalog-specific format)
qualified_name = f"{database}.{schema}.{topic_name}"
node_id = f"my_catalog:table:{environment_id}:{qualified_name}"
# Create node
node = LineageNode(
node_id=node_id,
system=self.system_type,
node_type=self.node_type,
qualified_name=qualified_name,
display_name=qualified_name,
environment_id=environment_id,
cluster_id=cluster_id,
attributes={
"database": database,
"schema": schema,
"table": topic_name,
},
)
# Create MATERIALIZES edge from tableflow to catalog
edge = LineageEdge(
src_id=tableflow_node_id,
dst_id=node_id,
edge_type=EdgeType.MATERIALIZES,
)
return node, edge
async def enrich(self, graph: LineageGraph) -> None:
"""Fetch table metadata from MyCatalog API."""
if not self._api_url or not self._api_token:
logger.debug("MyCatalog enrichment skipped - no credentials")
return
catalog_nodes = graph.filter_by_type(self.node_type)
if not catalog_nodes:
return
async with httpx.AsyncClient(
base_url=self._api_url,
headers={"Authorization": f"Bearer {self._api_token}"},
timeout=30.0,
) as client:
for node in catalog_nodes:
await self._enrich_node(client, graph, node)
async def _enrich_node(
self,
client: httpx.AsyncClient,
graph: LineageGraph,
node: LineageNode,
) -> None:
"""Fetch and merge metadata for a single table."""
qualified_name = node.qualified_name
url = f"/api/v1/tables/{qualified_name}"
try:
resp = await client.get(url)
if resp.status_code == 200:
data = resp.json()
enriched_attrs = {
**node.attributes,
"owner": data.get("owner"),
"description": data.get("description"),
"columns": data.get("columns"),
"row_count": data.get("row_count"),
}
enriched = node.model_copy(update={"attributes": enriched_attrs})
graph.add_node(enriched)
else:
logger.debug(
"MyCatalog API returned %d for %s",
resp.status_code,
qualified_name,
)
except httpx.HTTPError as exc:
logger.debug("HTTP error enriching %s: %s", qualified_name, exc)
def build_url(self, node: LineageNode) -> str | None:
"""Build a deep link to the table in MyCatalog UI."""
if not self._api_url:
return None
database = node.attributes.get("database")
schema = node.attributes.get("schema")
table = node.attributes.get("table")
if not all([database, schema, table]):
return None
return f"{self._api_url}/databases/{database}/schemas/{schema}/tables/{table}"
Step 2: Follow the DatabricksUCProvider Pattern
Use lineage_bridge/catalogs/databricks_uc.py as a reference. It demonstrates:
- Implementing all protocol methods
- Handling credentials (optional, graceful degradation)
- Async enrichment with httpx
- Building deep links to catalog UI
- Error handling and retry logic
Key patterns from DatabricksUCProvider:
# build_node creates the initial node from Tableflow config
def build_node(self, ci_config, tableflow_node_id, topic_name, cluster_id, environment_id):
uc_cfg = ci_config.get("unity_catalog", {})
catalog_name = uc_cfg.get("catalog_name", "default")
qualified = f"{catalog_name}.{cluster_id}.{topic_name}"
# ... create node and edge
return node, edge
# enrich fetches additional metadata from the catalog's API
async def enrich(self, graph):
if not self._workspace_url or not self._token:
return # Gracefully skip if no credentials
# Post-ADR-021, all catalog tables share NodeType.CATALOG_TABLE; the
# `catalog_type` discriminator picks the per-provider variant.
uc_nodes = graph.filter_catalog_nodes("UNITY_CATALOG")
async with httpx.AsyncClient(...) as client:
for node in uc_nodes:
await self._enrich_node(client, graph, node)
# build_url creates deep links
def build_url(self, node):
workspace_url = node.attributes.get("workspace_url")
catalog, schema, table = node.qualified_name.split(".")
return f"{workspace_url}/explore/data/{catalog}/{schema}/{table}"
Step 3: Register the Provider
Add your provider to lineage_bridge/catalogs/__init__.py:
from lineage_bridge.catalogs.my_catalog import MyCatalogProvider
__all__ = [
"DatabricksUCProvider",
"AWSGlueProvider",
"MyCatalogProvider", # Add here
]
Register it in the Tableflow extractor (lineage_bridge/clients/tableflow.py):
from lineage_bridge.catalogs.my_catalog import MyCatalogProvider
# In _initialize_providers method
if self.settings.my_catalog_api_url:
self._providers["MY_CATALOG"] = MyCatalogProvider(
api_url=self.settings.my_catalog_api_url,
api_token=self.settings.my_catalog_api_token,
)
Step 4: Add Settings
Add configuration to lineage_bridge/config/settings.py:
class Settings(BaseSettings):
# ... existing settings ...
# MyCatalog
my_catalog_api_url: str | None = Field(None, env="LINEAGE_BRIDGE_MY_CATALOG_API_URL")
my_catalog_api_token: SecretStr | None = Field(
None, env="LINEAGE_BRIDGE_MY_CATALOG_API_TOKEN"
)
Step 5: Write Tests
Create tests/unit/test_my_catalog.py:
import httpx
import pytest
import respx
from lineage_bridge.catalogs.my_catalog import MyCatalogProvider
from lineage_bridge.models.graph import LineageGraph, NodeType
@pytest.fixture()
def provider():
return MyCatalogProvider(
api_url="https://catalog.example.com",
api_token="test-token",
)
def test_build_node(provider):
"""Test building a catalog node from Tableflow config."""
ci_config = {
"my_catalog": {
"database": "my_db",
"schema": "my_schema",
}
}
node, edge = provider.build_node(
ci_config=ci_config,
tableflow_node_id="confluent:tableflow_table:env-1:orders",
topic_name="orders",
cluster_id="lkc-123",
environment_id="env-1",
)
assert node.qualified_name == "my_db.my_schema.orders"
assert node.node_type == NodeType.MY_CATALOG_TABLE
assert edge.edge_type == EdgeType.MATERIALIZES
@respx.mock
async def test_enrich(provider, sample_node):
"""Test enriching catalog nodes with API metadata."""
graph = LineageGraph()
node = sample_node(
"my_db.my_schema.orders",
NodeType.MY_CATALOG_TABLE,
attributes={"database": "my_db", "schema": "my_schema", "table": "orders"}
)
graph.add_node(node)
# Mock API response
respx.get(
"https://catalog.example.com/api/v1/tables/my_db.my_schema.orders"
).mock(
return_value=httpx.Response(
200,
json={
"owner": "data-team",
"description": "Customer orders",
"row_count": 1000000,
}
)
)
await provider.enrich(graph)
enriched = graph.get_node(node.node_id)
assert enriched.attributes["owner"] == "data-team"
assert enriched.attributes["row_count"] == 1000000
Documentation Requirements
When adding a new extractor or catalog provider:
- Docstrings: Add comprehensive docstrings to all public methods
- Type hints: Use type hints for all function signatures
- README update: Add your provider to the main README.md
- Architecture docs: Update CLAUDE.md if adding new modules
- Example usage: Provide example configuration in docs/
Common Patterns
Error Handling
try:
data = await self._fetch_resource(resource_id)
except httpx.HTTPError as exc:
logger.warning("Failed to fetch resource %s: %s", resource_id, exc)
return nodes, edges # Return partial results
Retry with Backoff
for attempt in range(MAX_RETRIES):
try:
resp = await client.get(url)
if resp.status_code == 200:
return resp.json()
if resp.status_code in (429, 500, 502, 503, 504):
delay = BACKOFF_BASE * (2 ** attempt)
await asyncio.sleep(delay)
continue
except httpx.HTTPError:
if attempt < MAX_RETRIES - 1:
await asyncio.sleep(BACKOFF_BASE * (2 ** attempt))
Pagination
# ConfluentClient provides paginate() helper
async def _list_resources(self) -> list[dict[str, Any]]:
path = "/api/v1/resources"
return await self.paginate(path)
# Manual pagination for non-Confluent APIs
async def _paginate_custom(self, url: str) -> list[dict]:
items = []
next_token = None
while True:
params = {"next_token": next_token} if next_token else {}
resp = await client.get(url, params=params)
data = resp.json()
items.extend(data.get("items", []))
next_token = data.get("next_token")
if not next_token:
break
return items
Checklist
Before submitting your new extractor or catalog provider:
- [ ] Implements the protocol fully
- [ ] Follows existing patterns (KafkaAdminClient or DatabricksUCProvider)
- [ ] Includes comprehensive unit tests (>80% coverage)
- [ ] Uses proper logging (logger.info, logger.debug, logger.warning)
- [ ] Handles errors gracefully
- [ ] Updates documentation
- [ ] Registered in orchestrator or tableflow client
- [ ] Added settings to config/settings.py
- [ ] Runs
make lintandmake formatsuccessfully - [ ] All tests pass:
make test
Questions? Check the Testing Guide or open a GitHub Discussion.