Client Architecture
LineageBridge uses a layered client architecture with protocols for extensibility and a base client for common HTTP operations.
Architecture Overview
┌─────────────────────────────────────────────────┐
│ Orchestrator (extractors/) │
└──────────────┬──────────────────────────────────┘
│
┌───────┴───────┐
│ │
v v
┌─────────────┐ ┌──────────────┐
│ Confluent │ │ Catalog │
│ Clients │ │ Providers │
│ │ │ │
│ (extends │ │ (implements │
│ Confluent │ │ Catalog │
│ Client) │ │ Provider) │
└─────────────┘ └──────────────┘
│ │
v v
┌─────────────┐ ┌──────────────┐
│ httpx │ │ httpx/boto3/ │
│ AsyncClient│ │ google-auth │
└─────────────┘ └──────────────┘
ConfluentClient Base
All Confluent Cloud API clients extend ConfluentClient, which provides:
- Async HTTP client (httpx)
- Basic authentication (API key/secret)
- Exponential backoff retry (3 attempts, configurable)
- Cursor-based pagination (follows
metadata.next) - Timeout handling (30s default)
Retry Policy
Retries on: - 429 Too Many Requests - 500 Internal Server Error - 502 Bad Gateway - 503 Service Unavailable - 504 Gateway Timeout
Backoff delay: 1.0 * (2 ** attempt) seconds (1s, 2s, 4s)
Respects Retry-After header when present.
Usage Example
from lineage_bridge.clients.base import ConfluentClient
client = ConfluentClient(
base_url="https://api.confluent.cloud",
api_key="YOUR_API_KEY",
api_secret="YOUR_API_SECRET",
)
async with client:
# GET request
data = await client.get("/org/v2/environments")
# POST request
result = await client.post("/path", json_body={"key": "value"})
# Paginated request
all_items = await client.paginate("/cmk/v2/clusters", params={"environment": "env-abc123"})
Error Handling
All requests raise httpx.HTTPStatusError on non-retryable failures (e.g., 401, 403, 404).
try:
data = await client.get("/path")
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
print("Unauthorized - check API key")
elif e.response.status_code == 403:
print("Forbidden - check permissions")
elif e.response.status_code == 404:
print("Not found")
LineageExtractor Protocol
Extraction clients implement the LineageExtractor protocol:
from typing import Protocol
class LineageExtractor(Protocol):
async def extract(self) -> tuple[list[LineageNode], list[LineageEdge]]:
"""Extract lineage data and return nodes and edges."""
...
This protocol enables: - Duck typing - No inheritance required - Easy mocking - Simple to create test doubles - Extensibility - Add new extractors without modifying existing code
Implementing Extractors
from lineage_bridge.clients.base import ConfluentClient
from lineage_bridge.clients.protocol import LineageExtractor
from lineage_bridge.models.graph import LineageNode, LineageEdge
class MyExtractor(ConfluentClient):
async def extract(self) -> tuple[list[LineageNode], list[LineageEdge]]:
nodes = []
edges = []
# Fetch data from API
data = await self.get("/my/endpoint")
# Build nodes and edges
for item in data:
node = LineageNode(...)
nodes.append(node)
return nodes, edges
Confluent Clients
KafkaAdminClient
Purpose: Extract Kafka topics and consumer groups.
Extends: ConfluentClient
API Endpoints: - /kafka/v3/clusters/{cluster_id}/topics - /kafka/v3/clusters/{cluster_id}/consumer-groups - /kafka/v3/clusters/{cluster_id}/consumer-groups/{group_id}/consumers
Credentials: Cluster-scoped API key (or Cloud API key with Kafka REST permissions)
Example:
from lineage_bridge.clients.kafka_admin import KafkaAdminClient
client = KafkaAdminClient(
base_url="https://pkc-abc123.us-east-1.aws.confluent.cloud:443",
api_key="KAFKA_KEY",
api_secret="KAFKA_SECRET",
cluster_id="lkc-xyz789",
environment_id="env-abc123",
)
async with client:
nodes, edges = await client.extract()
ConnectClient
Purpose: Extract Kafka Connect connectors and external datasets.
Extends: ConfluentClient
API Endpoints: - /connect/v1/environments/{env_id}/clusters - /connect/v1/environments/{env_id}/clusters/{cluster_id}/connectors
Credentials: Cloud API key
Example:
from lineage_bridge.clients.connect import ConnectClient
client = ConnectClient(
api_key="CLOUD_API_KEY",
api_secret="CLOUD_API_SECRET",
environment_id="env-abc123",
kafka_cluster_id="lkc-xyz789",
)
async with client:
nodes, edges = await client.extract()
KsqlDBClient
Purpose: Extract ksqlDB queries, streams, and tables.
Extends: ConfluentClient
API Endpoints: - /ksqldb/v1/organizations/{org_id}/environments/{env_id}/clusters - /ksqldb/v2/clusters/{cluster_id}/ksql (query introspection)
Credentials: Cloud API key + ksqlDB API key (optional)
Example:
from lineage_bridge.clients.ksqldb import KsqlDBClient
client = KsqlDBClient(
cloud_api_key="CLOUD_API_KEY",
cloud_api_secret="CLOUD_API_SECRET",
environment_id="env-abc123",
ksqldb_api_key="KSQLDB_KEY", # Optional
ksqldb_api_secret="KSQLDB_SECRET",
)
async with client:
nodes, edges = await client.extract()
FlinkClient
Purpose: Extract Flink SQL statements and parse SQL for lineage.
Extends: ConfluentClient
API Endpoints: - /sql/v1/organizations/{org_id}/environments/{env_id}/statements
Credentials: Cloud API key + Flink API key (optional)
SQL Parsing: Uses sqlglot to parse Flink SQL and extract table references.
Example:
from lineage_bridge.clients.flink import FlinkClient
client = FlinkClient(
cloud_api_key="CLOUD_API_KEY",
cloud_api_secret="CLOUD_API_SECRET",
environment_id="env-abc123",
organization_id="org-xyz",
flink_api_key="FLINK_KEY", # Optional
flink_api_secret="FLINK_SECRET",
)
async with client:
nodes, edges = await client.extract()
SchemaRegistryClient
Purpose: Extract schema metadata and link to topics.
Extends: ConfluentClient
API Endpoints: - /subjects - /subjects/{subject}/versions/latest
Credentials: Schema Registry API key (or Cloud API key)
Example:
from lineage_bridge.clients.schema_registry import SchemaRegistryClient
client = SchemaRegistryClient(
base_url="https://psrc-abc123.us-east-1.aws.confluent.cloud",
api_key="SR_KEY",
api_secret="SR_SECRET",
environment_id="env-abc123",
)
async with client:
nodes, edges = await client.extract()
StreamCatalogClient
Purpose: Enrich topic nodes with business metadata and tags.
Extends: ConfluentClient
API Endpoints: - /catalog/v1/entity/type/kafka_topic - /catalog/v1/entity/type/sr_schema
Credentials: Schema Registry API key (Stream Catalog is part of Schema Registry)
Behavior: Mutates graph in-place, returns empty lists.
Example:
from lineage_bridge.clients.stream_catalog import StreamCatalogClient
client = StreamCatalogClient(
base_url="https://psrc-abc123.us-east-1.aws.confluent.cloud",
api_key="SR_KEY",
api_secret="SR_SECRET",
environment_id="env-abc123",
)
async with client:
await client.enrich(graph) # Mutates graph
TableflowClient
Purpose: Map Kafka topics to data catalog tables.
Extends: ConfluentClient
API Endpoints: - /tableflow/v1/environments/{env_id}/integrations - /tableflow/v1/environments/{env_id}/topic-table-mappings
Credentials: Cloud API key (or Tableflow API key)
Provider Delegation: Calls CatalogProvider.build_node() to create catalog-specific nodes.
Example:
from lineage_bridge.clients.tableflow import TableflowClient
client = TableflowClient(
api_key="CLOUD_API_KEY",
api_secret="CLOUD_API_SECRET",
environment_id="env-abc123",
cluster_ids=["lkc-xyz789"],
)
async with client:
nodes, edges = await client.extract()
MetricsClient
Purpose: Enrich topic nodes with real-time throughput metrics.
Extends: ConfluentClient
API Endpoints: - /v2/metrics/cloud/query (Confluent Metrics API)
Credentials: Cloud API key
Behavior: Mutates graph in-place, returns count of enriched nodes.
Example:
from lineage_bridge.clients.metrics import MetricsClient
client = MetricsClient(
api_key="CLOUD_API_KEY",
api_secret="CLOUD_API_SECRET",
lookback_hours=1,
)
async with client:
enriched = await client.enrich(graph, cluster_id="lkc-xyz789")
CatalogProvider Protocol
Catalog integrations implement the CatalogProvider protocol:
from typing import Protocol
class CatalogProvider(Protocol):
catalog_type: str # "UNITY_CATALOG", "AWS_GLUE", "GOOGLE_DATA_LINEAGE"
def build_node(
self,
catalog_name: str,
schema_name: str,
table_name: str,
environment_id: str,
) -> tuple[LineageNode, LineageEdge]:
"""Create a catalog table node and MATERIALIZES edge."""
...
async def enrich(self, graph: LineageGraph) -> None:
"""Backfill metadata from the catalog API."""
...
def build_url(self, node: LineageNode) -> str:
"""Generate a deep link to the catalog UI."""
...
async def push_lineage(
self,
graph: LineageGraph,
*,
on_progress: Callable[[str, str], None] | None = None,
) -> PushResult:
"""Write lineage metadata back to the catalog."""
...
Databricks UC Provider
HTTP Client: httpx.AsyncClient with Bearer token
Authentication: workspace_url + token
API Endpoints: - /api/2.1/unity-catalog/tables/{full_name} (metadata) - /api/2.0/sql/statements (lineage push via SQL)
Example:
from lineage_bridge.catalogs.databricks_uc import DatabricksUCProvider
provider = DatabricksUCProvider(
workspace_url="https://myworkspace.cloud.databricks.com",
token="dapi...",
)
await provider.enrich(graph)
AWS Glue Provider
HTTP Client: boto3.client("glue") (sync, wrapped with asyncio.to_thread())
Authentication: boto3 auto-discovers credentials (env vars, ~/.aws/credentials, IAM roles)
API Endpoints: - glue:GetTable (metadata) - glue:UpdateTable (lineage push)
Example:
from lineage_bridge.catalogs.aws_glue import GlueCatalogProvider
provider = GlueCatalogProvider(region="us-east-1")
await provider.enrich(graph)
Google Data Lineage Provider
HTTP Client: httpx.AsyncClient with Application Default Credentials
Authentication: project_id + location (ADC via google-auth)
API Endpoints: - BigQuery: /bigquery/v2/projects/{project}/datasets/{dataset}/tables/{table} (metadata) - Data Lineage: /v1/projects/{project}/locations/{location}:processOpenLineageRunEvent (lineage push)
Example:
from lineage_bridge.catalogs.google_lineage import GoogleLineageProvider
provider = GoogleLineageProvider(
project_id="my-project",
location="us-central1",
)
await provider.enrich(graph)
Credential Resolution
Confluent Cloud
- Cloud API key:
LINEAGE_BRIDGE_CONFLUENT_CLOUD_API_KEY/LINEAGE_BRIDGE_CONFLUENT_CLOUD_API_SECRET - Cluster API key:
LINEAGE_BRIDGE_KAFKA_API_KEY/LINEAGE_BRIDGE_KAFKA_API_SECRET(optional) - Schema Registry:
LINEAGE_BRIDGE_SCHEMA_REGISTRY_API_KEY/LINEAGE_BRIDGE_SCHEMA_REGISTRY_API_SECRET(optional) - ksqlDB:
LINEAGE_BRIDGE_KSQLDB_API_KEY/LINEAGE_BRIDGE_KSQLDB_API_SECRET(optional) - Flink:
LINEAGE_BRIDGE_FLINK_API_KEY/LINEAGE_BRIDGE_FLINK_API_SECRET(optional)
Databricks
- UC:
LINEAGE_BRIDGE_DATABRICKS_WORKSPACE_URL/LINEAGE_BRIDGE_DATABRICKS_TOKEN - Warehouse ID:
LINEAGE_BRIDGE_DATABRICKS_WAREHOUSE_ID(optional, auto-discovered)
AWS
- Glue:
AWS_REGION(boto3 auto-discovers credentials from environment)
- BigQuery/Data Lineage:
LINEAGE_BRIDGE_GCP_PROJECT_ID/LINEAGE_BRIDGE_GCP_LOCATION(ADC auto-discovers credentials)
Timeout and Retry Configuration
Per-Extractor Timeout
Orchestrator wraps all extractors in asyncio.wait_for(coro, timeout=120).
HTTP Request Timeout
ConfluentClient uses a 30s default timeout per request:
Retry Configuration
Next Steps
- Extraction Pipeline - How clients are orchestrated
- Graph Model - Node/edge types
- Troubleshooting: API Errors