mltrace.db package

Submodules

mltrace.db.base module

class mltrace.db.base.BaseWithRepr[source]

Bases: object

mltrace.db.models module

class mltrace.db.models.Component(name, description, owner, tags=[])[source]

Bases: sqlalchemy.orm.decl_api.Base

add_tags(tags: List[mltrace.db.models.Tag])[source]
component_runs
description
name
owner
tags
class mltrace.db.models.ComponentRun(component_name)[source]

Bases: sqlalchemy.orm.decl_api.Base

add_input(input: mltrace.db.models.IOPointer)[source]

Add a single input (instance of IOPointer).

add_inputs(inputs: List[mltrace.db.models.IOPointer])[source]

Add a list of inputs (each element should be an instance of IOPointer).

add_notes(notes: str)[source]

Add notes describing details of component run

add_output(output: mltrace.db.models.IOPointer)[source]

“Add a single output (instance of IOPointer).

add_outputs(outputs: List[mltrace.db.models.IOPointer])[source]

Add a list of outputs (each element should be an instance of IOPointer).

add_staleness_message(message: str)[source]

Staleness indicator.

check_completeness()dict[source]

Returns a dictionary of success indicator and error messages.

code_snapshot
component_name
dependencies
end_timestamp
git_hash
git_tags
id
inputs
notes
outputs
set_code_snapshot(code_snapshot: bytes)[source]

Code snapshot setter.

set_end_timestamp(ts: Optional[datetime.datetime] = None)[source]

Call this function to set the end timestamp to a specific timestamp or now.

set_git_hash(git_hash: str)[source]

Git hash setter.

set_git_tags(git_tags: List[str])[source]

Git tag setter.

set_start_timestamp(ts: Optional[datetime.datetime] = None)[source]

Call this function to set the start timestamp to a specific timestamp or now.

set_upstream(dependencies: Union[List[mltrace.db.models.ComponentRun], mltrace.db.models.ComponentRun])[source]

Set dependencies for this ComponentRun. API similar to Airflow set_upstream.

stale
start_timestamp
class mltrace.db.models.IOPointer(name, pointer_type=<PointerTypeEnum.UNKNOWN: 'UNKNOWN'>)[source]

Bases: sqlalchemy.orm.decl_api.Base

clear_flag()[source]
flag
name
pointer_type
set_flag()[source]
set_pointer_type(pointer_type: mltrace.db.models.PointerTypeEnum)[source]
class mltrace.db.models.PointerTypeEnum(value)[source]

Bases: str, enum.Enum

An enumeration.

DATA = 'DATA'
ENDPOINT = 'ENDPOINT'
MODEL = 'MODEL'
UNKNOWN = 'UNKNOWN'
class mltrace.db.models.Tag(name)[source]

Bases: sqlalchemy.orm.decl_api.Base

name

mltrace.db.store module

class mltrace.db.store.Store(uri: str, delete_first: bool = False)[source]

Bases: object

Helper methods to interact with the db.

add_notes_to_component_run(component_run_id: str, notes: str)str[source]

Retreives existing component and adds tags.

add_tags_to_component(component_name: str, tags: List[str])[source]

Retreives existing component and adds tags.

commit_component_run(component_run: mltrace.db.models.ComponentRun, staleness_threshold: int = 2592000)[source]

Commits a fully initialized component run to the DB.

create_component(name: str, description: str, owner: str, tags: List[str] = [])[source]

Creates a component entity in the database if it does not already exist.

delete_component(component: mltrace.db.models.Component)[source]
delete_component_run(component_run: mltrace.db.models.ComponentRun)[source]
delete_io_pointer(io_pointer: mltrace.db.models.IOPointer)[source]
get_component(name: str)mltrace.db.models.Component[source]

Retrieves component if exists.

get_component_run(id: str)mltrace.db.models.ComponentRun[source]

Retrieves component run if exists.

get_components_with_owner(owner: str)List[mltrace.db.models.Component][source]

Returns a list of all the components associated with the specified order.

get_components_with_tag(tag: str)List[mltrace.db.models.Component][source]

Returns a list of all the components associated with that tag.

get_history(component_name: str, limit: int = 10, date_lower: Union[datetime.datetime, str] = datetime.datetime(1, 1, 1, 0, 0), date_upper: Union[datetime.datetime, str] = datetime.datetime(9999, 12, 31, 23, 59, 59, 999999))List[mltrace.db.models.ComponentRun][source]

Gets lineage for the component, or a history of all its runs.

get_io_pointer(name: str, pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None, create=True)mltrace.db.models.IOPointer[source]

Creates an io pointer around the specified path. Retrieves existing io pointer if exists in DB, otherwise creates a new one if create flag is set.

get_io_pointers(names: List[str], pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None)List[mltrace.db.models.IOPointer][source]

Creates io pointers around the specified path names. Retrieves existing io pointer if exists in DB, otherwise creates a new one with inferred pointer type.

get_recent_run_ids(limit: int = 50, last_run_id=None)List[str][source]

Returns a list of recent component run IDs.

get_tag(name=<class 'str'>)mltrace.db.models.Tag[source]

Creates a tag around the name if it doesn’t already exist.

initialize_empty_component_run(component_name: str)mltrace.db.models.ComponentRun[source]

Initializes an empty run for the specified component. Does not commit to the database.

review_flagged_outputs()Tuple[List[str], List[Tuple[mltrace.db.models.ComponentRun, int]]][source]

Finds common ComponentRuns for a group of flagged outputs.

set_dependencies_from_inputs(component_run: mltrace.db.models.ComponentRun)[source]

Gets IOPointers associated with component_run’s inputs, checks against any ComponentRun’s outputs, and if there are any matches, sets the ComponentRun’s dependency on the most recent match.

set_io_pointer_flag(output_id: str, value: bool)[source]

Sets the flag property of an IOPointer.

trace(output_id: str)[source]

Prints trace for an output id. Returns list of tuples (level, ComponentRun) where level is how many hops away the node is from the node that produced the output_id.

trace_batch(output_ids: List[str])[source]
web_trace(output_id: str)[source]

Prints list of ComponentRuns to display in the UI.

mltrace.db.utils module

Module contents

class mltrace.db.Component(name, description, owner, tags=[])[source]

Bases: sqlalchemy.orm.decl_api.Base

add_tags(tags: List[mltrace.db.models.Tag])[source]
component_runs
description
name
owner
tags
class mltrace.db.ComponentRun(component_name)[source]

Bases: sqlalchemy.orm.decl_api.Base

add_input(input: mltrace.db.models.IOPointer)[source]

Add a single input (instance of IOPointer).

add_inputs(inputs: List[mltrace.db.models.IOPointer])[source]

Add a list of inputs (each element should be an instance of IOPointer).

add_notes(notes: str)[source]

Add notes describing details of component run

add_output(output: mltrace.db.models.IOPointer)[source]

“Add a single output (instance of IOPointer).

add_outputs(outputs: List[mltrace.db.models.IOPointer])[source]

Add a list of outputs (each element should be an instance of IOPointer).

add_staleness_message(message: str)[source]

Staleness indicator.

check_completeness()dict[source]

Returns a dictionary of success indicator and error messages.

code_snapshot
component_name
dependencies
end_timestamp
git_hash
git_tags
id
inputs
notes
outputs
set_code_snapshot(code_snapshot: bytes)[source]

Code snapshot setter.

set_end_timestamp(ts: Optional[datetime.datetime] = None)[source]

Call this function to set the end timestamp to a specific timestamp or now.

set_git_hash(git_hash: str)[source]

Git hash setter.

set_git_tags(git_tags: List[str])[source]

Git tag setter.

set_start_timestamp(ts: Optional[datetime.datetime] = None)[source]

Call this function to set the start timestamp to a specific timestamp or now.

set_upstream(dependencies: Union[List[mltrace.db.models.ComponentRun], mltrace.db.models.ComponentRun])[source]

Set dependencies for this ComponentRun. API similar to Airflow set_upstream.

stale
start_timestamp
class mltrace.db.IOPointer(name, pointer_type=<PointerTypeEnum.UNKNOWN: 'UNKNOWN'>)[source]

Bases: sqlalchemy.orm.decl_api.Base

clear_flag()[source]
flag
name
pointer_type
set_flag()[source]
set_pointer_type(pointer_type: mltrace.db.models.PointerTypeEnum)[source]
class mltrace.db.PointerTypeEnum(value)[source]

Bases: str, enum.Enum

An enumeration.

DATA = 'DATA'
ENDPOINT = 'ENDPOINT'
MODEL = 'MODEL'
UNKNOWN = 'UNKNOWN'
class mltrace.db.Store(uri: str, delete_first: bool = False)[source]

Bases: object

Helper methods to interact with the db.

add_notes_to_component_run(component_run_id: str, notes: str)str[source]

Retreives existing component and adds tags.

add_tags_to_component(component_name: str, tags: List[str])[source]

Retreives existing component and adds tags.

commit_component_run(component_run: mltrace.db.models.ComponentRun, staleness_threshold: int = 2592000)[source]

Commits a fully initialized component run to the DB.

create_component(name: str, description: str, owner: str, tags: List[str] = [])[source]

Creates a component entity in the database if it does not already exist.

delete_component(component: mltrace.db.models.Component)[source]
delete_component_run(component_run: mltrace.db.models.ComponentRun)[source]
delete_io_pointer(io_pointer: mltrace.db.models.IOPointer)[source]
get_component(name: str)mltrace.db.models.Component[source]

Retrieves component if exists.

get_component_run(id: str)mltrace.db.models.ComponentRun[source]

Retrieves component run if exists.

get_components_with_owner(owner: str)List[mltrace.db.models.Component][source]

Returns a list of all the components associated with the specified order.

get_components_with_tag(tag: str)List[mltrace.db.models.Component][source]

Returns a list of all the components associated with that tag.

get_history(component_name: str, limit: int = 10, date_lower: Union[datetime.datetime, str] = datetime.datetime(1, 1, 1, 0, 0), date_upper: Union[datetime.datetime, str] = datetime.datetime(9999, 12, 31, 23, 59, 59, 999999))List[mltrace.db.models.ComponentRun][source]

Gets lineage for the component, or a history of all its runs.

get_io_pointer(name: str, pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None, create=True)mltrace.db.models.IOPointer[source]

Creates an io pointer around the specified path. Retrieves existing io pointer if exists in DB, otherwise creates a new one if create flag is set.

get_io_pointers(names: List[str], pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None)List[mltrace.db.models.IOPointer][source]

Creates io pointers around the specified path names. Retrieves existing io pointer if exists in DB, otherwise creates a new one with inferred pointer type.

get_recent_run_ids(limit: int = 50, last_run_id=None)List[str][source]

Returns a list of recent component run IDs.

get_tag(name=<class 'str'>)mltrace.db.models.Tag[source]

Creates a tag around the name if it doesn’t already exist.

initialize_empty_component_run(component_name: str)mltrace.db.models.ComponentRun[source]

Initializes an empty run for the specified component. Does not commit to the database.

review_flagged_outputs()Tuple[List[str], List[Tuple[mltrace.db.models.ComponentRun, int]]][source]

Finds common ComponentRuns for a group of flagged outputs.

set_dependencies_from_inputs(component_run: mltrace.db.models.ComponentRun)[source]

Gets IOPointers associated with component_run’s inputs, checks against any ComponentRun’s outputs, and if there are any matches, sets the ComponentRun’s dependency on the most recent match.

set_io_pointer_flag(output_id: str, value: bool)[source]

Sets the flag property of an IOPointer.

trace(output_id: str)[source]

Prints trace for an output id. Returns list of tuples (level, ComponentRun) where level is how many hops away the node is from the node that produced the output_id.

trace_batch(output_ids: List[str])[source]
web_trace(output_id: str)[source]

Prints list of ComponentRuns to display in the UI.

class mltrace.db.Tag(name)[source]

Bases: sqlalchemy.orm.decl_api.Base

name