mltrace.db package¶
Submodules¶
mltrace.db.base module¶
mltrace.db.models module¶
- class mltrace.db.models.Component(name, description, owner, tags=[])[source]¶
Bases:
sqlalchemy.orm.decl_api.Base
- add_tags(tags: List[mltrace.db.models.Tag])[source]¶
- component_runs¶
- description¶
- name¶
- owner¶
- tags¶
- class mltrace.db.models.ComponentRun(component_name)[source]¶
Bases:
sqlalchemy.orm.decl_api.Base
- add_input(input: mltrace.db.models.IOPointer)[source]¶
Add a single input (instance of IOPointer).
- add_inputs(inputs: List[mltrace.db.models.IOPointer])[source]¶
Add a list of inputs (each element should be an instance of IOPointer).
- add_output(output: mltrace.db.models.IOPointer)[source]¶
“Add a single output (instance of IOPointer).
- add_outputs(outputs: List[mltrace.db.models.IOPointer])[source]¶
Add a list of outputs (each element should be an instance of IOPointer).
- code_snapshot¶
- component_name¶
- dependencies¶
- end_timestamp¶
- git_hash¶
- git_tags¶
- id¶
- inputs¶
- notes¶
- outputs¶
- set_end_timestamp(ts: Optional[datetime.datetime] = None)[source]¶
Call this function to set the end timestamp to a specific timestamp or now.
- set_start_timestamp(ts: Optional[datetime.datetime] = None)[source]¶
Call this function to set the start timestamp to a specific timestamp or now.
- set_upstream(dependencies: Union[List[mltrace.db.models.ComponentRun], mltrace.db.models.ComponentRun])[source]¶
Set dependencies for this ComponentRun. API similar to Airflow set_upstream.
- stale¶
- start_timestamp¶
- class mltrace.db.models.IOPointer(name, value=b'', pointer_type=PointerTypeEnum.UNKNOWN)[source]¶
Bases:
sqlalchemy.orm.decl_api.Base
- flag¶
- name¶
- pointer_type¶
- set_pointer_type(pointer_type: mltrace.db.models.PointerTypeEnum)[source]¶
- value¶
mltrace.db.store module¶
- class mltrace.db.store.Store(uri: str, delete_first: bool = False)[source]¶
Bases:
object
Helper methods to interact with the db.
- add_notes_to_component_run(component_run_id: str, notes: str) str [source]¶
Retreives existing component and adds tags.
- add_tags_to_component(component_name: str, tags: List[str])[source]¶
Retreives existing component and adds tags.
- commit_component_run(component_run: mltrace.db.models.ComponentRun, staleness_threshold: int = 2592000)[source]¶
Commits a fully initialized component run to the DB.
- create_component(name: str, description: str, owner: str, tags: List[str] = [])[source]¶
Creates a component entity in the database if it does not already exist.
- delete_component(component: mltrace.db.models.Component)[source]¶
- delete_component_run(component_run: mltrace.db.models.ComponentRun)[source]¶
- delete_io_pointer(io_pointer: mltrace.db.models.IOPointer)[source]¶
- get_all_tags() List[mltrace.db.models.Tag] [source]¶
- get_component(name: str) mltrace.db.models.Component [source]¶
Retrieves component if exists.
- get_component_run(id: str) mltrace.db.models.ComponentRun [source]¶
Retrieves component run if exists.
- get_components(tag: str = '', owner: str = '')[source]¶
Returns a list of all the components associated with the specified owner and/or tags.
- get_history(component_name: str, limit: int = 10, date_lower: Union[datetime.datetime, str] = datetime.datetime(1, 1, 1, 0, 0), date_upper: Union[datetime.datetime, str] = datetime.datetime(9999, 12, 31, 23, 59, 59, 999999)) List[mltrace.db.models.ComponentRun] [source]¶
Gets lineage for the component, or a history of all its runs.
- get_io_pointer(name: str, value: Any = '', pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None, create=True) mltrace.db.models.IOPointer [source]¶
Creates an io pointer around the specified path. Retrieves existing io pointer if exists in DB, otherwise creates a new one if create flag is set.
- get_io_pointers(names: List[str], values: Optional[List[Any]] = None, pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None) List[mltrace.db.models.IOPointer] [source]¶
Creates io pointers around the specified path names. Retrieves existing io pointer if exists in DB, otherwise creates a new one with inferred pointer type.
- get_io_pointers_from_args(**kwargs)[source]¶
Filters kwargs to data and model types, then gets corresponding IOPointers.
- get_recent_run_ids(limit: int = 50, last_run_id=None) List[str] [source]¶
Returns a list of recent component run IDs.
- get_tag(name=<class 'str'>) mltrace.db.models.Tag [source]¶
Creates a tag around the name if it doesn’t already exist.
- initialize_empty_component_run(component_name: str) mltrace.db.models.ComponentRun [source]¶
Initializes an empty run for the specified component. Does not commit to the database.
- review_flagged_outputs() Tuple[List[str], List[Tuple[mltrace.db.models.ComponentRun, int]]] [source]¶
Finds common ComponentRuns for a group of flagged outputs.
- set_dependencies_from_inputs(component_run: mltrace.db.models.ComponentRun)[source]¶
Gets IOPointers associated with component_run’s inputs, checks against any ComponentRun’s outputs, and if there are any matches, sets the ComponentRun’s dependency on the most recent match.
mltrace.db.utils module¶
Module contents¶
- class mltrace.db.Component(name, description, owner, tags=[])[source]¶
Bases:
sqlalchemy.orm.decl_api.Base
- add_tags(tags: List[mltrace.db.models.Tag])[source]¶
- component_runs¶
- description¶
- name¶
- owner¶
- tags¶
- class mltrace.db.ComponentRun(component_name)[source]¶
Bases:
sqlalchemy.orm.decl_api.Base
- add_input(input: mltrace.db.models.IOPointer)[source]¶
Add a single input (instance of IOPointer).
- add_inputs(inputs: List[mltrace.db.models.IOPointer])[source]¶
Add a list of inputs (each element should be an instance of IOPointer).
- add_output(output: mltrace.db.models.IOPointer)[source]¶
“Add a single output (instance of IOPointer).
- add_outputs(outputs: List[mltrace.db.models.IOPointer])[source]¶
Add a list of outputs (each element should be an instance of IOPointer).
- code_snapshot¶
- component_name¶
- dependencies¶
- end_timestamp¶
- git_hash¶
- git_tags¶
- id¶
- inputs¶
- notes¶
- outputs¶
- set_end_timestamp(ts: Optional[datetime.datetime] = None)[source]¶
Call this function to set the end timestamp to a specific timestamp or now.
- set_start_timestamp(ts: Optional[datetime.datetime] = None)[source]¶
Call this function to set the start timestamp to a specific timestamp or now.
- set_upstream(dependencies: Union[List[mltrace.db.models.ComponentRun], mltrace.db.models.ComponentRun])[source]¶
Set dependencies for this ComponentRun. API similar to Airflow set_upstream.
- stale¶
- start_timestamp¶
- class mltrace.db.IOPointer(name, value=b'', pointer_type=PointerTypeEnum.UNKNOWN)[source]¶
Bases:
sqlalchemy.orm.decl_api.Base
- flag¶
- name¶
- pointer_type¶
- set_pointer_type(pointer_type: mltrace.db.models.PointerTypeEnum)[source]¶
- value¶
- class mltrace.db.PointerTypeEnum(value)[source]¶
Bases:
str
,enum.Enum
An enumeration.
- DATA = 'DATA'¶
- ENDPOINT = 'ENDPOINT'¶
- MODEL = 'MODEL'¶
- UNKNOWN = 'UNKNOWN'¶
- class mltrace.db.Store(uri: str, delete_first: bool = False)[source]¶
Bases:
object
Helper methods to interact with the db.
- add_notes_to_component_run(component_run_id: str, notes: str) str [source]¶
Retreives existing component and adds tags.
- add_tags_to_component(component_name: str, tags: List[str])[source]¶
Retreives existing component and adds tags.
- commit_component_run(component_run: mltrace.db.models.ComponentRun, staleness_threshold: int = 2592000)[source]¶
Commits a fully initialized component run to the DB.
- create_component(name: str, description: str, owner: str, tags: List[str] = [])[source]¶
Creates a component entity in the database if it does not already exist.
- delete_component(component: mltrace.db.models.Component)[source]¶
- delete_component_run(component_run: mltrace.db.models.ComponentRun)[source]¶
- delete_io_pointer(io_pointer: mltrace.db.models.IOPointer)[source]¶
- get_all_tags() List[mltrace.db.models.Tag] [source]¶
- get_component(name: str) mltrace.db.models.Component [source]¶
Retrieves component if exists.
- get_component_run(id: str) mltrace.db.models.ComponentRun [source]¶
Retrieves component run if exists.
- get_components(tag: str = '', owner: str = '')[source]¶
Returns a list of all the components associated with the specified owner and/or tags.
- get_history(component_name: str, limit: int = 10, date_lower: Union[datetime.datetime, str] = datetime.datetime(1, 1, 1, 0, 0), date_upper: Union[datetime.datetime, str] = datetime.datetime(9999, 12, 31, 23, 59, 59, 999999)) List[mltrace.db.models.ComponentRun] [source]¶
Gets lineage for the component, or a history of all its runs.
- get_io_pointer(name: str, value: Any = '', pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None, create=True) mltrace.db.models.IOPointer [source]¶
Creates an io pointer around the specified path. Retrieves existing io pointer if exists in DB, otherwise creates a new one if create flag is set.
- get_io_pointers(names: List[str], values: Optional[List[Any]] = None, pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None) List[mltrace.db.models.IOPointer] [source]¶
Creates io pointers around the specified path names. Retrieves existing io pointer if exists in DB, otherwise creates a new one with inferred pointer type.
- get_io_pointers_from_args(**kwargs)[source]¶
Filters kwargs to data and model types, then gets corresponding IOPointers.
- get_recent_run_ids(limit: int = 50, last_run_id=None) List[str] [source]¶
Returns a list of recent component run IDs.
- get_tag(name=<class 'str'>) mltrace.db.models.Tag [source]¶
Creates a tag around the name if it doesn’t already exist.
- initialize_empty_component_run(component_name: str) mltrace.db.models.ComponentRun [source]¶
Initializes an empty run for the specified component. Does not commit to the database.
- review_flagged_outputs() Tuple[List[str], List[Tuple[mltrace.db.models.ComponentRun, int]]] [source]¶
Finds common ComponentRuns for a group of flagged outputs.
- set_dependencies_from_inputs(component_run: mltrace.db.models.ComponentRun)[source]¶
Gets IOPointers associated with component_run’s inputs, checks against any ComponentRun’s outputs, and if there are any matches, sets the ComponentRun’s dependency on the most recent match.