mltrace package

Module contents

class mltrace.Component(name: str = '', owner: str = '', description: str = '', beforeTests: list = [], afterTests: list = [], tags: List[str] = [])[source]

Bases: mltrace.entities.base.Base

afterRun(**local_vars)[source]

Computation to execute after running a component. Will run all test objects listed in afterTests.

property afterTests: list
beforeRun(**kwargs)[source]

Computation to execute before running a component. Will run each test object listed in beforeTests.

property beforeTests: list
property description: str
property name: str
property owner: str
run(inputs: List[str] = [], outputs: List[str] = [], input_vars: List[str] = [], output_vars: List[str] = [], input_kwargs: Dict[str, str] = {}, output_kwargs: Dict[str, str] = {}, endpoint: bool = False, staleness_threshold: int = 2592000, auto_log: bool = False, *user_args, **user_kwargs)[source]

Decorator around the function executed: c = Component() @c.run def my_function(arg1, arg2):

do_something()

arg1 and arg2 are the arguments passed to the beforeRun and afterRun methods. We first execute the beforeRun method, then the function itself, then the afterRun method with the values of the args at the end of the function.

ADD DESCRIPTION HERE ABOUT INPUT VARIABLEs and what they are

property tags: List[str]
class mltrace.ComponentRun(component_name: str, notes: str = '', start_timestamp: Optional[datetime.datetime] = None, end_timestamp: Optional[datetime.datetime] = None, inputs: List[mltrace.entities.io_pointer.IOPointer] = [], outputs: List[mltrace.entities.io_pointer.IOPointer] = [], git_hash: Optional[str] = None, git_tags: Optional[List[str]] = None, code_snapshot: Optional[str] = None, id: Optional[str] = None, stale: List[str] = [], dependencies: List[str] = [])[source]

Bases: mltrace.entities.base.Base

Component Run abstraction.

add_input(inp: Union[str, mltrace.entities.io_pointer.IOPointer], pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None)[source]

Add a single input (instance of IOPointer).

add_inputs(inputs: List[Union[str, mltrace.entities.io_pointer.IOPointer]])[source]

Add a list of inputs (each element should be an instance of IOPointer).

add_output(out: Union[str, mltrace.entities.io_pointer.IOPointer], pointer_type: Optional[mltrace.db.models.PointerTypeEnum] = None)[source]

“Add a single output (instance of IOPointer).

add_outputs(outputs: List[Union[str, mltrace.entities.io_pointer.IOPointer]])[source]

Add a list of outputs (each element should be an instance of IOPointer).

property code_snapshot: str
property component_name: str
property dependencies: List[str]
property end_timestamp: datetime.datetime
property git_hash: str
property git_tags: List[str]
property id: str
property inputs: List[mltrace.entities.io_pointer.IOPointer]
property notes: str
property outputs: List[mltrace.entities.io_pointer.IOPointer]
set_end_timestamp(ts: Optional[datetime.datetime] = None)[source]
set_start_timestamp(ts: Optional[datetime.datetime] = None)[source]
set_upstream(dependencies: Union[str, List[str]])[source]

Set dependencies for this ComponentRun. API similar to Airflow set_upstream. It will grab the most recent run for the dependency name.

property stale: List[str]
property start_timestamp: datetime.datetime
class mltrace.IOPointer(name: str, value: Any = '', pointer_type: mltrace.db.models.PointerTypeEnum = PointerTypeEnum.UNKNOWN, flag: bool = False)[source]

Bases: mltrace.entities.base.Base

property flag: bool
property name: str
property pointer_type: mltrace.db.models.PointerTypeEnum
property value: Any
class mltrace.Test(name: str = '')[source]

Bases: object

getTestMethods()[source]

Gets all methods in this class that start with “test”

property name
runTests(**kwargs)[source]

Runs all tests in this class.

mltrace.add_notes_to_component_run(component_run_id: str, notes: str) str[source]

Adds notes to component run.

mltrace.backtrace(output_pointer: str)[source]

Prints trace for an output id. Returns list of tuples (level, ComponentRun) where level is how many hops away the node is from the node that produced the output_id.

mltrace.clean_db()[source]

Deletes database and reinitializes tables.

mltrace.create_component(name: str, description: str, owner: str, tags: List[str] = [])[source]

Creates a component entity in the database.

mltrace.create_random_ids(num_outputs=1) List[str][source]

Returns a list of num_outputs ids that a client can use to tag outputs.

mltrace.flag_output_id(output_id: str) bool[source]

Sets the flag property of an IOPointer to true.

mltrace.get_all_tags() List[str][source]
mltrace.get_component_information(component_name: str) mltrace.entities.base_component.Component[source]

Returns a Component with the name, info, owner, and tags.

mltrace.get_component_run_information(component_run_id: str) mltrace.entities.component_run.ComponentRun[source]

Returns a ComponentRun object.

mltrace.get_components(tag='', owner='') List[mltrace.entities.base_component.Component][source]

Returns all components with the specified owner and/or tag. Else, returns all components.

mltrace.get_db_uri() str[source]
mltrace.get_git_hash() str[source]

Gets hash of the parent git repo.

mltrace.get_git_tags() List[str][source]

Gets tags associated with commit of parent git repo, if exists ref:https://stackoverflow.com/questions/34932306/get-tags-of-a-commit

mltrace.get_history(component_name: str, limit: int = 10, date_lower: Union[datetime.datetime, str] = datetime.datetime(1, 1, 1, 0, 0), date_upper: Union[datetime.datetime, str] = datetime.datetime(9999, 12, 31, 23, 59, 59, 999999)) List[mltrace.entities.component_run.ComponentRun][source]

Returns a list of ComponentRuns that are part of the component’s history.

mltrace.get_io_pointer(io_pointer_id: str, io_pointer_val: Optional[Any] = None, create=True)[source]

Returns IO Pointer metadata.

mltrace.get_recent_run_ids(limit: int = 5, last_run_id=None)[source]

Returns most recent component run ids.

mltrace.load(pathname: str)[source]

Loads joblib file at pathname.

mltrace.log_component_run(component_run: mltrace.entities.component_run.ComponentRun, set_dependencies_from_inputs=True, staleness_threshold: int = 2592000)[source]

Takes client-facing ComponentRun object and logs it to the DB.

mltrace.register(component_name: str, inputs: List[str] = [], outputs: List[str] = [], input_vars: List[str] = [], output_vars: List[str] = [], input_kwargs: Dict[str, str] = {}, output_kwargs: Dict[str, str] = {}, endpoint: bool = False, staleness_threshold: int = 2592000, auto_log: bool = False)[source]
mltrace.review_flagged_outputs()[source]

Finds common ComponentRuns for a group of flagged outputs. Returns a list of ComponentRuns and occurrence counts in the group of flagged outputs, sorted by descending count and then alphabetically.

mltrace.save(obj, pathname: Optional[str] = None) str[source]

Saves joblib object to pathname.

mltrace.set_address(address: str)[source]
mltrace.set_db_uri(uri: str)[source]
mltrace.tag_component(component_name: str, tags: List[str])[source]

Adds tags to existing component.

mltrace.unflag_all()[source]
mltrace.unflag_output_id(output_id: str) bool[source]

Sets the flag property of an IOPointer to false.

mltrace.web_trace(output_id: str)[source]