adaptive_scheduler.utils module¶
Utility functions for adaptive_scheduler.
- class adaptive_scheduler.utils.LRUCachedCallable(function, *, max_size=128, with_cloudpickle=False)[source]¶
Bases:
object
Wraps a function to become cached.
- Parameters:
- class adaptive_scheduler.utils.WrappedFunction(function, *, mode='random_id')[source]¶
Bases:
object
A wrapper to allow cloudpickle.load`ed functions with `ProcessPoolExecutor.
A wrapper around a serialized function that handles deserialization and caches the deserialized function in the worker process.
- Parameters:
function (
Callable
[...
,Any
]) – The function to be serialized and wrapped.mode (
Literal
['memory'
,'random_id'
,'file'
]) – All of the options avoids sending the function to all workers. If “random_id”, store the serialized function only in the global cache. If “file”, save the serialized function to a file and store the path to the file in the global cache. Only keep the path in this object. If “memory”, store the full serialized function in the object.
- _cache_key¶
The key used to access the deserialized function in the global cache.
Examples
>>> import cloudpickle >>> def square(x): ... return x * x >>> wrapped_function = WrappedFunction(square) >>> wrapped_function(4) 16
- adaptive_scheduler.utils.add_constant_to_fname(combo, constant, *, folder=None, ext='.pickle', sig_figs=8)[source]¶
Construct old and new filename based on a combo.
Assumes
combo2fname
has been used to construct the old filename. Adds constant dict to the combo and returns the new filename too.Returns a tuple of
old_fname
andnew_fname
.
- adaptive_scheduler.utils.atomic_write(dest, mode='w', *args, return_path=False, **kwargs)[source]¶
Write atomically to ‘dest’, using a temporary file in the same directory.
This function has the same signature as ‘open’, except that the default mode is ‘w’, not ‘r’, and there is an additional keyword-only parameter, ‘return_path’. If ‘return_path=True’ then a Path pointing to the (as yet nonexistant) temporary file is yielded, rather than a file handle. This is useful when calling libraries that expect a path, rather than an open file handle.
- Return type:
- adaptive_scheduler.utils.cloudpickle_learners(learners, fnames, *, initializers=None, with_progress_bar=False, empty_copies=True)[source]¶
Save a list of learners to disk using cloudpickle.
Returns the total size of the saved files in bytes and the total time.
- adaptive_scheduler.utils.combine_sequence_learners(learners, big_learner=None)[source]¶
Combine several
SequenceLearner
s into a single one.Also copy over the data.
Assumes that all
learners
take the same function.- Parameters:
learners (
list
[SequenceLearner
]) – List ofSequenceLearner
s.big_learner (
SequenceLearner
|None
) – A learner to load, if None, a new learner will be generated.
- Returns:
Big
SequenceLearner
with data fromlearners
.- Return type:
- adaptive_scheduler.utils.combo2fname(combo, folder=None, ext='.pickle', sig_figs=8)[source]¶
Converts a dict into a human readable filename.
Improved version of
combo_to_fname
.- Return type:
- adaptive_scheduler.utils.combo_to_fname(combo, folder=None, ext='.pickle')[source]¶
Converts a dict into a human readable filename.
- Return type:
- adaptive_scheduler.utils.connect_to_ipyparallel(n, profile, timeout=300, folder=None, client_kwargs=None)[source]¶
Connect to an ipcluster on the cluster headnode.
- Parameters:
n (
int
) – Number of engines to be started.profile (
str
) – Profile name of IPython profile.timeout (
int
) – Time for which we try to connect to get all the engines.folder (
str
|None
) – Folder that is added to the path of the engines, e.g."~/Work/my_current_project"
.client_kwargs (
dict
|None
) – Keyword arguments passed toipyparallel.Client
.
- Returns:
An IPyparallel client.
- Return type:
client
- adaptive_scheduler.utils.copy_from_sequence_learner(learner_from, learner_to)[source]¶
Copy the data from a
SequenceLearner
into a different one.- Parameters:
learner_from (
SequenceLearner
) – Learner to take the data from.learner_to (
SequenceLearner
) – Learner to tell the data to.
- Return type:
- adaptive_scheduler.utils.expand_dict_columns(df)[source]¶
Expand dict columns in a dataframe.
- Return type:
- adaptive_scheduler.utils.fname_to_dataframe(fname, format='pickle')[source]¶
Convert a learner filename (data) to a filename is used to save the dataframe.
- Return type:
- adaptive_scheduler.utils.fname_to_learner(fname, *, return_initializer=False)[source]¶
Load a learner from a filename (based on cloudpickled learner).
- Return type:
tuple
[BaseLearner
,Optional
[Callable
[[],None
]]] |BaseLearner
- adaptive_scheduler.utils.fname_to_learner_fname(fname)[source]¶
Convert a learner filename (data) to a filename used to cloudpickle the learner.
- Return type:
- adaptive_scheduler.utils.load_dataframes(fnames, *, concat=True, read_kwargs=None, format='pickle')[source]¶
Load a list of dataframes from disk.
- adaptive_scheduler.utils.load_parallel(learners, fnames, *, with_progress_bar=True, max_workers=None)[source]¶
Load a sequence of learners in parallel.
- Parameters:
learners (
list
[BaseLearner
]) – The learners to be loaded.fnames (
list
[str
] |list
[Path
]) – A list of filenames corresponding to learners.with_progress_bar (
bool
) – Display a progress bar usingtqdm
.max_workers (
int
|None
) – The maximum number of parallel threads when loading the data. IfNone
, use the maximum number of threads that is possible.
- Return type:
- adaptive_scheduler.utils.log_exception(log, msg, exception)[source]¶
Log an exception with a message.
- Return type:
- adaptive_scheduler.utils.maybe_round(x, sig_figs)[source]¶
Round to specified number of sigfigs if x is a float or complex.
- Return type:
- adaptive_scheduler.utils.round_sigfigs(num, sig_figs)[source]¶
Round to specified number of sigfigs.
From http://code.activestate.com/recipes/578114-round-number-to-specified-number-of-significant-di/
- Return type:
- adaptive_scheduler.utils.save_dataframe(fname, *, format='pickle', save_kwargs=None, expand_dicts=True, atomically=True, **to_dataframe_kwargs)[source]¶
Save the learner’s data to disk as pandas.DataFrame.
- Return type:
Callable
[[BaseLearner
],None
]
- adaptive_scheduler.utils.save_parallel(learners, fnames, *, with_progress_bar=True)[source]¶
Save a sequence of learners in parallel.
Create a cache similar to
functools.lru_cache
.This will actually cache the return values of the function, whereas
functools.lru_cache
will pickle the decorated function each time with an empty cache.- Return type:
- adaptive_scheduler.utils.shuffle_list(*lists, seed=0)[source]¶
Shuffle multiple lists in the same order.
- Return type:
zip
- async adaptive_scheduler.utils.sleep_unless_task_is_done(task, sleep_duration)[source]¶
Sleep for an interval, unless the task is done before then.
- Return type:
- adaptive_scheduler.utils.smart_goal(goal, learners)[source]¶
Extract a goal from the learners.
- Parameters:
goal (
Union
[Callable
[[BaseLearner
],bool
],int
,float
,datetime
,timedelta
,None
]) – Either a typical callable goal, or integer for number of points goal, or float for loss goal, or None to automatically determine, ordatetime.timedelta
for a time-based goal.learners (
list
[BaseLearner
]) – List of learners.
- Return type:
Callable[[adaptive.BaseLearner], bool]
- adaptive_scheduler.utils.split_in_balancing_learners(learners, fnames, n_parts, strategy='npoints')[source]¶
Split a list of learners and fnames into
adaptive.BalancingLearner
s.- Parameters:
learners (
list
[BaseLearner
]) – List of learners.n_parts (
int
) – Total number ofBalancingLearner
s.strategy (
str
) – Learning strategy of theBalancingLearner
.
- Return type:
new_learners, new_fnames
- adaptive_scheduler.utils.split_sequence_in_sequence_learners(function, sequence, n_learners, folder='')[source]¶
Split a sequenceinto
adaptive.SequenceLearner
s and fnames.- Parameters:
function (
Callable
[[Any
],Any
]) – Function foradaptive.SequenceLearner
s.sequence (
Sequence
[Any
]) – The sequence to split inton_learners
.n_learners (
int
) – Total number ofSequenceLearner
s.
- Return type:
tuple
[list
[SequenceLearner
],list
[str
]]- Returns:
new_learners – List of
SequenceLearner
s.new_fnames – List of str based on a hash of the sequence.
- adaptive_scheduler.utils.split_sequence_learner(big_learner, n_learners, folder='')[source]¶
Split a sinlge
SequenceLearner
into many.Split into mutiple
adaptive.SequenceLearner
s (with the data loaded) and fnames.See also
split_sequence_in_sequence_learners
.- Parameters:
big_learner (
SequenceLearner
) – ASequenceLearner
instancen_learners (
int
) – Total number ofSequenceLearner
s.
- Return type:
tuple
[list
[SequenceLearner
],list
[str
]]- Returns:
new_learners – List of
SequenceLearner
s.new_fnames – List of str based on a hash of the sequence.
- adaptive_scheduler.utils.track_file_creation_progress(paths_dict, interval=1)[source]¶
Initialize and asynchronously track the progress of file creation.
WARNING: This function modifies the provided dictionary in-place.
This function sets up an asynchronous monitoring system that periodically checks for the existence of specified files or groups of files. Each item in the provided dictionary can be a single file (Path object) or a group of files (tuple of Path objects). The progress is updated for each file or group of files only when all files in the group exist. This allows tracking of complex file creation processes where multiple files together constitute a single unit of work.
The tracking occurs at regular intervals, specified by the user, and updates individual and, if applicable, total progress bars to reflect the current state of file creation. It is particularly useful in environments where files are expected to be created over time and need to be monitored collectively.
- Parameters:
paths_dict (dict[str, set[Union[Path, Tuple[Path, ...]]]]) – A dictionary with keys representing categories and values being sets of file paths (Path objects) or groups of file paths (tuples of Path objects) to monitor.
interval (int) – The time interval (in seconds) at which the progress is updated.
- Returns:
The asyncio Task object that is tracking the file creation progress.
- Return type:
Examples
>>> paths_dict = { "docs": {Path("docs/environment.yml"), (Path("doc1.md"), Path("doc2.md"))}, "example2": {Path("/path/to/file3"), Path("/path/to/file4")}, } >>> task = track_file_creation_progress(paths_dict)