Graph Model
LineageBridge represents data lineage as a directed graph where nodes are data assets and edges are data flow relationships.
Data Structures
The graph model is built on three core Pydantic models:
LineageNode
Represents a data asset or processing step in the lineage graph.
class LineageNode(BaseModel):
node_id: str # Unique ID: {system}:{type}:{env_id}:{qualified_name}
system: SystemType # CONFLUENT, DATABRICKS, AWS, GOOGLE, EXTERNAL
node_type: NodeType # See NodeType enum below
qualified_name: str # Fully qualified name (e.g., catalog.schema.table)
display_name: str # Human-readable name
environment_id: str | None # Confluent environment ID
environment_name: str | None # Environment display name
cluster_id: str | None # Kafka cluster ID
cluster_name: str | None # Cluster display name
attributes: dict[str, Any] # Extensible metadata
tags: list[str] # Business tags
url: str | None # Deep link to UI
first_seen: datetime # First observed timestamp
last_seen: datetime # Last observed timestamp
LineageEdge
Represents a directed data flow relationship between two nodes.
class LineageEdge(BaseModel):
src_id: str # Source node ID
dst_id: str # Destination node ID
edge_type: EdgeType # PRODUCES, CONSUMES, TRANSFORMS, etc.
confidence: float # 1.0 = deterministic, <1.0 = inferred
attributes: dict[str, Any] # Extensible metadata
first_seen: datetime # First observed timestamp
last_seen: datetime # Last observed timestamp
LineageGraph
In-memory graph container backed by NetworkX DiGraph.
class LineageGraph:
def add_node(node: LineageNode) -> None
def add_edge(edge: LineageEdge) -> None
def get_node(node_id: str) -> LineageNode | None
def get_edge(src_id: str, dst_id: str, edge_type: EdgeType) -> LineageEdge | None
def get_neighbors(node_id: str, direction: str) -> list[LineageNode]
def upstream(node_id: str, hops: int) -> list[LineageNode]
def downstream(node_id: str, hops: int) -> list[LineageNode]
def filter_by_type(node_type: NodeType) -> list[LineageNode]
def filter_by_env(environment_id: str) -> list[LineageNode]
def search_nodes(query: str) -> list[LineageNode]
def validate() -> list[str]
def to_dict() -> dict[str, Any]
def to_json_file(path: str) -> None
Node ID Format
All node IDs follow a consistent format:
Examples:
- Kafka topic:
confluent:kafka_topic:env-abc123:my-topic - Connector:
confluent:connector:env-abc123:lkc-xyz789:mysql-source - ksqlDB query:
confluent:ksqldb_query:env-abc123:lksqlc-abc:QUERY_123 - Flink job:
confluent:flink_job:env-abc123:lfcp-xyz:my-statement - UC table:
databricks:uc_table:env-abc123:main.sales.orders(see "Catalog node IDs" note below) - Glue table:
aws:glue_table:env-abc123:my_database.my_table - Google table:
google:google_table:env-abc123:project.dataset.table
Catalog node IDs (v0.5.0): ADR-021 collapsed the per-catalog NodeType values into a single
CATALOG_TABLE, but the node-ID segments above were left intentionally unchanged (uc_table/glue_table/google_table) for ID stability — graphs serialised before the rename still resolve to the right keys. The runtime discriminator is thecatalog_typefield on the node (UNITY_CATALOG/AWS_GLUE/ etc.), not the ID-segment string. - Schema:confluent:schema:env-abc123:my-topic-value:v1- External dataset:external:external_dataset:env-abc123:s3://bucket/path- Consumer group:confluent:consumer_group:env-abc123:lkc-xyz789:my-group
NodeType Enum
class NodeType(StrEnum):
KAFKA_TOPIC = "kafka_topic"
CONNECTOR = "connector"
KSQLDB_QUERY = "ksqldb_query"
FLINK_JOB = "flink_job"
TABLEFLOW_TABLE = "tableflow_table"
CATALOG_TABLE = "catalog_table" # v0.5.0: collapses UC/Glue/Google
SCHEMA = "schema"
EXTERNAL_DATASET = "external_dataset"
CONSUMER_GROUP = "consumer_group"
The companion LineageNode.catalog_type: str | None discriminates among catalogs (UNITY_CATALOG, AWS_GLUE, GOOGLE_DATA_LINEAGE, AWS_DATAZONE, future SNOWFLAKE / WATSONX). It's None for non-catalog node types.
Node Type Descriptions
| Type | Description | Example |
|---|---|---|
KAFKA_TOPIC | Kafka topic in a cluster | orders, user-events |
CONNECTOR | Kafka Connect source or sink | mysql-source, s3-sink |
KSQLDB_QUERY | ksqlDB persistent query, stream, or table | CSAS_ORDERS_0, USER_STREAM |
FLINK_JOB | Flink SQL statement | process-clickstream |
TABLEFLOW_TABLE | Tableflow integration table | Intermediate mapping node |
CATALOG_TABLE | Catalog table — UC / Glue / BigQuery / DataZone (discriminated by catalog_type) | main.sales.orders (UC), my_database.my_table (Glue), project.dataset.table (BigQuery) |
SCHEMA | Schema Registry schema version | orders-value, users-key |
EXTERNAL_DATASET | External system (S3, database, etc.) | s3://bucket/path, mysql://host/db.table |
CONSUMER_GROUP | Kafka consumer group | payment-processor, analytics |
EdgeType Enum
class EdgeType(StrEnum):
PRODUCES = "produces"
CONSUMES = "consumes"
TRANSFORMS = "transforms"
MATERIALIZES = "materializes"
HAS_SCHEMA = "has_schema"
MEMBER_OF = "member_of"
Edge Type Descriptions
| Type | Direction | Description | Example |
|---|---|---|---|
PRODUCES | Source → Topic | Data source writes to topic | mysql-source → orders |
CONSUMES | Topic → Sink | Data sink reads from topic | orders → s3-sink |
TRANSFORMS | Source → Destination | Processing step | orders → CSAS_ENRICHED_ORDERS |
MATERIALIZES | Topic → Table | Topic materialized to catalog table | orders → main.sales.orders |
HAS_SCHEMA | Topic → Schema | Topic uses schema | orders → orders-value:v1 |
MEMBER_OF | Consumer Group → Topic | Consumer group subscribes to topic | analytics → orders |
SystemType Enum
class SystemType(StrEnum):
CONFLUENT = "confluent"
DATABRICKS = "databricks"
AWS = "aws"
GOOGLE = "google"
EXTERNAL = "external"
System Type Descriptions
| System | Description | Node Types |
|---|---|---|
CONFLUENT | Confluent Cloud | KAFKA_TOPIC, CONNECTOR, KSQLDB_QUERY, FLINK_JOB, SCHEMA, CONSUMER_GROUP, TABLEFLOW_TABLE |
DATABRICKS | Databricks Unity Catalog | CATALOG_TABLE (catalog_type=UNITY_CATALOG) |
AWS | AWS Glue Data Catalog / DataZone | CATALOG_TABLE (catalog_type=AWS_GLUE or AWS_DATAZONE) |
GOOGLE | Google BigQuery / Data Lineage | CATALOG_TABLE (catalog_type=GOOGLE_DATA_LINEAGE) |
EXTERNAL | External systems | EXTERNAL_DATASET |
Graph Operations
Adding Nodes and Edges
graph = LineageGraph()
# Add a Kafka topic node
topic = LineageNode(
node_id="confluent:kafka_topic:env-abc123:orders",
system=SystemType.CONFLUENT,
node_type=NodeType.KAFKA_TOPIC,
qualified_name="orders",
display_name="orders",
environment_id="env-abc123",
cluster_id="lkc-xyz789",
)
graph.add_node(topic)
# Add a UC table node — note CATALOG_TABLE + catalog_type discriminator (v0.5.0).
# The "uc_table" segment in node_id is intentional ID-stability legacy; the
# runtime type is CATALOG_TABLE.
table = LineageNode(
node_id="databricks:uc_table:env-abc123:main.sales.orders",
system=SystemType.DATABRICKS,
node_type=NodeType.CATALOG_TABLE,
catalog_type="UNITY_CATALOG",
qualified_name="main.sales.orders",
display_name="main.sales.orders",
)
graph.add_node(table)
# Add a MATERIALIZES edge
edge = LineageEdge(
src_id="confluent:kafka_topic:env-abc123:orders",
dst_id="databricks:uc_table:env-abc123:main.sales.orders",
edge_type=EdgeType.MATERIALIZES,
)
graph.add_edge(edge)
Querying Lineage
# Get upstream nodes (predecessors)
upstream = graph.upstream(node_id="databricks:uc_table:env-abc123:main.sales.orders", hops=2)
# Get downstream nodes (successors)
downstream = graph.downstream(node_id="confluent:kafka_topic:env-abc123:orders", hops=1)
# Get all Kafka topics
topics = graph.filter_by_type(NodeType.KAFKA_TOPIC)
# Search by name
results = graph.search_nodes("orders")
# Get neighbors
neighbors = graph.get_neighbors(
node_id="confluent:kafka_topic:env-abc123:orders",
direction="both" # "upstream", "downstream", or "both"
)
Graph Merging
When a node is added multiple times: - first_seen is preserved from the first occurrence - last_seen is updated to the most recent occurrence - attributes are merged (new values overwrite old ones) - tags are combined (deduplicated)
When an edge is added multiple times: - first_seen is preserved - last_seen is updated
Validation
warnings = graph.validate()
# Returns list of warnings:
# - "Orphan node: {node_id}" (nodes with no edges, excluding schemas)
# - "Dangling edge src: {src_id}" (edge references missing source)
# - "Dangling edge dst: {dst_id}" (edge references missing destination)
Serialization
# Export to JSON file
graph.to_json_file("lineage_graph.json")
# Load from JSON file
graph = LineageGraph.from_json_file("lineage_graph.json")
# Convert to dict
data = graph.to_dict()
# Returns: {"nodes": [...], "edges": [...]}
Example Graph
Simple Pipeline
mysql-source (CONNECTOR)
|
| PRODUCES
|
v
orders (KAFKA_TOPIC) ----HAS_SCHEMA----> orders-value:v1 (SCHEMA)
|
| CONSUMES
|
v
CSAS_ENRICHED_ORDERS (KSQLDB_QUERY)
|
| PRODUCES
|
v
enriched-orders (KAFKA_TOPIC)
|
| MATERIALIZES
|
v
main.sales.orders (CATALOG_TABLE, catalog_type=UNITY_CATALOG)
Node Attributes
Nodes can carry arbitrary metadata in the attributes dictionary:
Kafka Topic:
{
"partition_count": 6,
"replication_factor": 3,
"retention_ms": 604800000,
"bytes_in_rate": 12345.67, # Added by MetricsClient
"record_in_rate": 123.45
}
Connector:
{
"connector_class": "MySqlSource",
"connector_type": "source",
"tasks_max": 1,
"status": "RUNNING"
}
UC Table:
{
"catalog": "main",
"schema": "sales",
"table": "orders",
"table_type": "MANAGED",
"storage_location": "s3://bucket/path/",
"data_source_format": "DELTA"
}
Graph Statistics
# Node and edge counts
print(f"Nodes: {graph.node_count}")
print(f"Edges: {graph.edge_count}")
# Pipeline count (connected components with ≥1 edge, excluding HAS_SCHEMA)
print(f"Pipelines: {graph.pipeline_count}")
# Breakdown by system
from collections import Counter
system_counts = Counter(n.system.value for n in graph.nodes)
# Example: {'confluent': 45, 'databricks': 10, 'aws': 5}
Next Steps
- Extraction Pipeline - How the graph is built
- Clients - Client protocols and patterns
- Catalog Integration - UC, Glue, Google providers