adaptive_scheduler.utils module¶
Utility functions for adaptive_scheduler.
- class adaptive_scheduler.utils.LRUCachedCallable(function, *, max_size=128, with_cloudpickle=False)[source]¶
Bases:
objectWraps a function to become cached.
- Parameters:
- class adaptive_scheduler.utils.WrappedFunction(function, *, mode='random_id')[source]¶
Bases:
objectA wrapper to allow cloudpickle.load`ed functions with `ProcessPoolExecutor.
A wrapper around a serialized function that handles deserialization and caches the deserialized function in the worker process.
- Parameters:
function (
Callable[...,Any]) – The function to be serialized and wrapped.mode (
Literal['memory','random_id','file']) – All of the options avoids sending the function to all workers. If “random_id”, store the serialized function only in the global cache. If “file”, save the serialized function to a file and store the path to the file in the global cache. Only keep the path in this object. If “memory”, store the full serialized function in the object.
- _cache_key¶
The key used to access the deserialized function in the global cache.
Examples
>>> import cloudpickle >>> def square(x): ... return x * x >>> wrapped_function = WrappedFunction(square) >>> wrapped_function(4) 16
- adaptive_scheduler.utils.add_constant_to_fname(combo, constant, *, folder=None, ext='.pickle', sig_figs=8)[source]¶
Construct old and new filename based on a combo.
Assumes
combo2fnamehas been used to construct the old filename. Adds constant dict to the combo and returns the new filename too.Returns a tuple of
old_fnameandnew_fname.
- adaptive_scheduler.utils.add_timing_to_object(obj, timing_dict=None, *, include_private=True, include_magic=False, print_times=True, print_cutoff=0.01)[source]¶
Dynamically updates all methods of an object to measure execution time.
- Parameters:
obj (
Any) – The object whose methods should be timedtiming_dict (
dict[str,list[float]] |None) – Optional existing dictionary to store times ininclude_private (
bool) – Whether to include private methods (starting with _)include_magic (
bool) – Whether to include magic methods (starting with __)print_times (
bool) – Whether to print execution timesprint_cutoff (
float) – Minimum time to print in seconds
- Return type:
- Returns:
Dictionary mapping method names to lists of execution times
- adaptive_scheduler.utils.atomic_write(dest, mode='w', *args, return_path=False, **kwargs)[source]¶
Write atomically to ‘dest’, using a temporary file in the same directory.
This function has the same signature as ‘open’, except that the default mode is ‘w’, not ‘r’, and there is an additional keyword-only parameter, ‘return_path’. If ‘return_path=True’ then a Path pointing to the (as yet nonexistant) temporary file is yielded, rather than a file handle. This is useful when calling libraries that expect a path, rather than an open file handle.
- Return type:
- adaptive_scheduler.utils.cloudpickle_learners(learners, fnames, *, initializers=None, with_progress_bar=False, empty_copies=True)[source]¶
Save a list of learners to disk using cloudpickle.
Returns the total size of the saved files in bytes and the total time.
- adaptive_scheduler.utils.combine_sequence_learners(learners, big_learner=None)[source]¶
Combine several
SequenceLearners into a single one.Also copy over the data.
Assumes that all
learnerstake the same function.- Parameters:
learners (
list[SequenceLearner]) – List ofSequenceLearners.big_learner (
SequenceLearner|None) – A learner to load, if None, a new learner will be generated.
- Returns:
Big
SequenceLearnerwith data fromlearners.- Return type:
- adaptive_scheduler.utils.combo2fname(combo, folder=None, ext='.pickle', sig_figs=8)[source]¶
Converts a dict into a human readable filename.
Improved version of
combo_to_fname.- Return type:
- adaptive_scheduler.utils.combo_to_fname(combo, folder=None, ext='.pickle')[source]¶
Converts a dict into a human readable filename.
- Return type:
- adaptive_scheduler.utils.connect_to_ipyparallel(n, profile, timeout=300, folder=None, client_kwargs=None)[source]¶
Connect to an ipcluster on the cluster headnode.
- Parameters:
n (
int) – Number of engines to be started.profile (
str) – Profile name of IPython profile.timeout (
int) – Time for which we try to connect to get all the engines.folder (
str|None) – Folder that is added to the path of the engines, e.g."~/Work/my_current_project".client_kwargs (
dict|None) – Keyword arguments passed toipyparallel.Client.
- Returns:
An IPyparallel client.
- Return type:
client
- adaptive_scheduler.utils.copy_from_sequence_learner(learner_from, learner_to)[source]¶
Copy the data from a
SequenceLearnerinto a different one.- Parameters:
learner_from (
SequenceLearner) – Learner to take the data from.learner_to (
SequenceLearner) – Learner to tell the data to.
- Return type:
- adaptive_scheduler.utils.expand_dict_columns(df)[source]¶
Expand dict columns in a dataframe.
- Return type:
- adaptive_scheduler.utils.fname_to_dataframe(fname, format='pickle')[source]¶
Convert a learner filename (data) to a filename is used to save the dataframe.
- Return type:
- adaptive_scheduler.utils.fname_to_learner(fname, *, return_initializer=False)[source]¶
Load a learner from a filename (based on cloudpickled learner).
- Return type:
tuple[BaseLearner,Callable[[],None] |None] |BaseLearner
- adaptive_scheduler.utils.fname_to_learner_fname(fname)[source]¶
Convert a learner filename (data) to a filename used to cloudpickle the learner.
- Return type:
- adaptive_scheduler.utils.load_dataframes(fnames, *, concat=True, read_kwargs=None, format='pickle')[source]¶
Load a list of dataframes from disk.
- adaptive_scheduler.utils.load_parallel(learners, fnames, *, with_progress_bar=True, max_workers=None)[source]¶
Load a sequence of learners in parallel.
- Parameters:
learners (
list[BaseLearner]) – The learners to be loaded.fnames (
list[str] |list[Path]) – A list of filenames corresponding to learners.with_progress_bar (
bool) – Display a progress bar usingtqdm.max_workers (
int|None) – The maximum number of parallel threads when loading the data. IfNone, use the maximum number of threads that is possible.
- Return type:
- adaptive_scheduler.utils.log_exception(log, msg, exception)[source]¶
Log an exception with a message.
- Return type:
- adaptive_scheduler.utils.maybe_round(x, sig_figs)[source]¶
Round to specified number of sigfigs if x is a float or complex.
- Return type:
- adaptive_scheduler.utils.round_sigfigs(num, sig_figs)[source]¶
Round to specified number of sigfigs.
From http://code.activestate.com/recipes/578114-round-number-to-specified-number-of-significant-di/
- Return type:
- adaptive_scheduler.utils.save_dataframe(fname, *, format='pickle', save_kwargs=None, expand_dicts=True, atomically=True, **to_dataframe_kwargs)[source]¶
Save the learner’s data to disk as pandas.DataFrame.
- Return type:
Callable[[BaseLearner],None]
- adaptive_scheduler.utils.save_parallel(learners, fnames, *, with_progress_bar=True)[source]¶
Save a sequence of learners in parallel.
Create a cache similar to
functools.lru_cache.This will actually cache the return values of the function, whereas
functools.lru_cachewill pickle the decorated function each time with an empty cache.- Return type:
- adaptive_scheduler.utils.shuffle_list(*lists, seed=0)[source]¶
Shuffle multiple lists in the same order.
- Return type:
zip
- async adaptive_scheduler.utils.sleep_unless_task_is_done(task, sleep_duration, trigger_event=None)[source]¶
Sleep for an interval, unless the task or a trigger event is done earlier.
- Return type:
- adaptive_scheduler.utils.smart_goal(goal, learners)[source]¶
Extract a goal from the learners.
- Parameters:
goal (
Callable[[BaseLearner],bool] |int|float|datetime|timedelta|None) – Either a typical callable goal, or integer for number of points goal, or float for loss goal, or None to automatically determine, ordatetime.timedeltafor a time-based goal.learners (
list[BaseLearner]) – List of learners.
- Return type:
Callable[[adaptive.BaseLearner], bool]
- adaptive_scheduler.utils.split_in_balancing_learners(learners, fnames, n_parts, strategy='npoints')[source]¶
Split a list of learners and fnames into
adaptive.BalancingLearners.- Parameters:
learners (
list[BaseLearner]) – List of learners.n_parts (
int) – Total number ofBalancingLearners.strategy (
str) – Learning strategy of theBalancingLearner.
- Return type:
new_learners, new_fnames
- adaptive_scheduler.utils.split_sequence_in_sequence_learners(function, sequence, n_learners, folder='')[source]¶
Split a sequenceinto
adaptive.SequenceLearners and fnames.- Parameters:
function (
Callable[[Any],Any]) – Function foradaptive.SequenceLearners.sequence (
Sequence[Any]) – The sequence to split inton_learners.n_learners (
int) – Total number ofSequenceLearners.
- Return type:
tuple[list[SequenceLearner],list[str]]- Returns:
new_learners – List of
SequenceLearners.new_fnames – List of str based on a hash of the sequence.
- adaptive_scheduler.utils.split_sequence_learner(big_learner, n_learners, folder='')[source]¶
Split a sinlge
SequenceLearnerinto many.Split into mutiple
adaptive.SequenceLearners (with the data loaded) and fnames.See also
split_sequence_in_sequence_learners.- Parameters:
big_learner (
SequenceLearner) – ASequenceLearnerinstancen_learners (
int) – Total number ofSequenceLearners.
- Return type:
tuple[list[SequenceLearner],list[str]]- Returns:
new_learners – List of
SequenceLearners.new_fnames – List of str based on a hash of the sequence.
- adaptive_scheduler.utils.track_file_creation_progress(paths_dict, interval=1)[source]¶
Initialize and asynchronously track the progress of file creation.
WARNING: This function modifies the provided dictionary in-place.
This function sets up an asynchronous monitoring system that periodically checks for the existence of specified files or groups of files. Each item in the provided dictionary can be a single file (Path object) or a group of files (tuple of Path objects). The progress is updated for each file or group of files only when all files in the group exist. This allows tracking of complex file creation processes where multiple files together constitute a single unit of work.
The tracking occurs at regular intervals, specified by the user, and updates individual and, if applicable, total progress bars to reflect the current state of file creation. It is particularly useful in environments where files are expected to be created over time and need to be monitored collectively.
- Parameters:
paths_dict (dict[str, set[Union[Path, Tuple[Path, ...]]]]) – A dictionary with keys representing categories and values being sets of file paths (Path objects) or groups of file paths (tuples of Path objects) to monitor.
interval (int) – The time interval (in seconds) at which the progress is updated.
- Returns:
The asyncio Task object that is tracking the file creation progress.
- Return type:
Examples
>>> paths_dict = { "docs": {Path("docs/environment.yml"), (Path("doc1.md"), Path("doc2.md"))}, "example2": {Path("/path/to/file3"), Path("/path/to/file4")}, } >>> task = track_file_creation_progress(paths_dict)