adaptive_scheduler.utils module

Utility functions for adaptive_scheduler.

class adaptive_scheduler.utils.LRUCachedCallable(function, *, max_size=128, with_cloudpickle=False)[source]

Bases: object

Wraps a function to become cached.

Parameters:
  • function (Callable[..., Any])

  • max_size (int) – Cache size of the LRU cache, by default 128.

  • with_cloudpickle (bool) – Use cloudpickle for storing the data in memory.

property cache_dict: dict

Returns a copy of the cache.

class adaptive_scheduler.utils.WrappedFunction(function, *, mode='random_id')[source]

Bases: object

A wrapper to allow cloudpickle.load`ed functions with `ProcessPoolExecutor.

A wrapper around a serialized function that handles deserialization and caches the deserialized function in the worker process.

Parameters:
  • function (Callable[..., Any]) – The function to be serialized and wrapped.

  • mode (Literal['memory', 'random_id', 'file']) – All of the options avoids sending the function to all workers. If “random_id”, store the serialized function only in the global cache. If “file”, save the serialized function to a file and store the path to the file in the global cache. Only keep the path in this object. If “memory”, store the full serialized function in the object.

_cache_key

The key used to access the deserialized function in the global cache.

Examples

>>> import cloudpickle
>>> def square(x):
...     return x * x
>>> wrapped_function = WrappedFunction(square)
>>> wrapped_function(4)
16
adaptive_scheduler.utils.add_constant_to_fname(combo, constant, *, folder=None, ext='.pickle', sig_figs=8)[source]

Construct old and new filename based on a combo.

Assumes combo2fname has been used to construct the old filename. Adds constant dict to the combo and returns the new filename too.

Returns a tuple of old_fname and new_fname.

Return type:

tuple[Path, Path]

adaptive_scheduler.utils.atomic_write(dest, mode='w', *args, return_path=False, **kwargs)[source]

Write atomically to ‘dest’, using a temporary file in the same directory.

This function has the same signature as ‘open’, except that the default mode is ‘w’, not ‘r’, and there is an additional keyword-only parameter, ‘return_path’. If ‘return_path=True’ then a Path pointing to the (as yet nonexistant) temporary file is yielded, rather than a file handle. This is useful when calling libraries that expect a path, rather than an open file handle.

Return type:

Any

adaptive_scheduler.utils.cloudpickle_learners(learners, fnames, *, initializers=None, with_progress_bar=False, empty_copies=True)[source]

Save a list of learners to disk using cloudpickle.

Returns the total size of the saved files in bytes and the total time.

Return type:

tuple[int, float]

adaptive_scheduler.utils.combine_sequence_learners(learners, big_learner=None)[source]

Combine several SequenceLearners into a single one.

Also copy over the data.

Assumes that all learners take the same function.

Parameters:
Returns:

Big SequenceLearner with data from learners.

Return type:

adaptive.SequenceLearner

adaptive_scheduler.utils.combo2fname(combo, folder=None, ext='.pickle', sig_figs=8)[source]

Converts a dict into a human readable filename.

Improved version of combo_to_fname.

Return type:

Path

adaptive_scheduler.utils.combo_to_fname(combo, folder=None, ext='.pickle')[source]

Converts a dict into a human readable filename.

Return type:

Path

adaptive_scheduler.utils.connect_to_ipyparallel(n, profile, timeout=300, folder=None, client_kwargs=None)[source]

Connect to an ipcluster on the cluster headnode.

Parameters:
  • n (int) – Number of engines to be started.

  • profile (str) – Profile name of IPython profile.

  • timeout (int) – Time for which we try to connect to get all the engines.

  • folder (str | None) – Folder that is added to the path of the engines, e.g. "~/Work/my_current_project".

  • client_kwargs (dict | None) – Keyword arguments passed to ipyparallel.Client.

Returns:

An IPyparallel client.

Return type:

client

adaptive_scheduler.utils.copy_from_sequence_learner(learner_from, learner_to)[source]

Copy the data from a SequenceLearner into a different one.

Parameters:
Return type:

None

adaptive_scheduler.utils.expand_dict_columns(df)[source]

Expand dict columns in a dataframe.

Return type:

DataFrame

adaptive_scheduler.utils.fname_to_dataframe(fname, format='pickle')[source]

Convert a learner filename (data) to a filename is used to save the dataframe.

Return type:

Path

adaptive_scheduler.utils.fname_to_learner(fname, *, return_initializer=False)[source]

Load a learner from a filename (based on cloudpickled learner).

Return type:

tuple[BaseLearner, Optional[Callable[[], None]]] | BaseLearner

adaptive_scheduler.utils.fname_to_learner_fname(fname)[source]

Convert a learner filename (data) to a filename used to cloudpickle the learner.

Return type:

Path

adaptive_scheduler.utils.hash_anything(x)[source]

Hash anything.

Return type:

str

adaptive_scheduler.utils.load_dataframes(fnames, *, concat=True, read_kwargs=None, format='pickle')[source]

Load a list of dataframes from disk.

Return type:

DataFrame | list[DataFrame]

adaptive_scheduler.utils.load_parallel(learners, fnames, *, with_progress_bar=True, max_workers=None)[source]

Load a sequence of learners in parallel.

Parameters:
  • learners (list[BaseLearner]) – The learners to be loaded.

  • fnames (list[str] | list[Path]) – A list of filenames corresponding to learners.

  • with_progress_bar (bool) – Display a progress bar using tqdm.

  • max_workers (int | None) – The maximum number of parallel threads when loading the data. If None, use the maximum number of threads that is possible.

Return type:

None

adaptive_scheduler.utils.log_exception(log, msg, exception)[source]

Log an exception with a message.

Return type:

None

adaptive_scheduler.utils.maybe_round(x, sig_figs)[source]

Round to specified number of sigfigs if x is a float or complex.

Return type:

Any

adaptive_scheduler.utils.round_sigfigs(num, sig_figs)[source]

Round to specified number of sigfigs.

From http://code.activestate.com/recipes/578114-round-number-to-specified-number-of-significant-di/

Return type:

float

adaptive_scheduler.utils.save_dataframe(fname, *, format='pickle', save_kwargs=None, expand_dicts=True, atomically=True, **to_dataframe_kwargs)[source]

Save the learner’s data to disk as pandas.DataFrame.

Return type:

Callable[[BaseLearner], None]

adaptive_scheduler.utils.save_parallel(learners, fnames, *, with_progress_bar=True)[source]

Save a sequence of learners in parallel.

Parameters:
  • learners (list[BaseLearner]) – The learners to be saved.

  • fnames (list[str] | list[Path]) – A list of filenames corresponding to learners.

  • with_progress_bar (bool) – Display a progress bar using tqdm.

Return type:

None

adaptive_scheduler.utils.shared_memory_cache(cache_size=128)[source]

Create a cache similar to functools.lru_cache.

This will actually cache the return values of the function, whereas functools.lru_cache will pickle the decorated function each time with an empty cache.

Return type:

Callable

adaptive_scheduler.utils.shuffle_list(*lists, seed=0)[source]

Shuffle multiple lists in the same order.

Return type:

zip

async adaptive_scheduler.utils.sleep_unless_task_is_done(task, sleep_duration)[source]

Sleep for an interval, unless the task is done before then.

Return type:

bool

adaptive_scheduler.utils.smart_goal(goal, learners)[source]

Extract a goal from the learners.

Parameters:
Return type:

Callable[[adaptive.BaseLearner], bool]

adaptive_scheduler.utils.split(seq, n_parts)[source]

Split up a sequence into n_parts.

Parameters:
  • seq (Iterable) – A list or other iterable that has to be split up.

  • n_parts (int) – The sequence will be split up in this many parts.

Return type:

Iterable[tuple]

adaptive_scheduler.utils.split_in_balancing_learners(learners, fnames, n_parts, strategy='npoints')[source]

Split a list of learners and fnames into adaptive.BalancingLearners.

Parameters:
Return type:

new_learners, new_fnames

adaptive_scheduler.utils.split_sequence_in_sequence_learners(function, sequence, n_learners, folder='')[source]

Split a sequenceinto adaptive.SequenceLearners and fnames.

Parameters:
Return type:

tuple[list[SequenceLearner], list[str]]

Returns:

  • new_learners – List of SequenceLearners.

  • new_fnames – List of str based on a hash of the sequence.

adaptive_scheduler.utils.split_sequence_learner(big_learner, n_learners, folder='')[source]

Split a sinlge SequenceLearner into many.

Split into mutiple adaptive.SequenceLearners (with the data loaded) and fnames.

See also split_sequence_in_sequence_learners.

Parameters:
Return type:

tuple[list[SequenceLearner], list[str]]

Returns:

  • new_learners – List of SequenceLearners.

  • new_fnames – List of str based on a hash of the sequence.

adaptive_scheduler.utils.track_file_creation_progress(paths_dict, interval=1)[source]

Initialize and asynchronously track the progress of file creation.

WARNING: This function modifies the provided dictionary in-place.

This function sets up an asynchronous monitoring system that periodically checks for the existence of specified files or groups of files. Each item in the provided dictionary can be a single file (Path object) or a group of files (tuple of Path objects). The progress is updated for each file or group of files only when all files in the group exist. This allows tracking of complex file creation processes where multiple files together constitute a single unit of work.

The tracking occurs at regular intervals, specified by the user, and updates individual and, if applicable, total progress bars to reflect the current state of file creation. It is particularly useful in environments where files are expected to be created over time and need to be monitored collectively.

Parameters:
  • paths_dict (dict[str, set[Union[Path, Tuple[Path, ...]]]]) – A dictionary with keys representing categories and values being sets of file paths (Path objects) or groups of file paths (tuples of Path objects) to monitor.

  • interval (int) – The time interval (in seconds) at which the progress is updated.

Returns:

The asyncio Task object that is tracking the file creation progress.

Return type:

asyncio.Task

Examples

>>> paths_dict = {
    "docs": {Path("docs/environment.yml"), (Path("doc1.md"), Path("doc2.md"))},
    "example2": {Path("/path/to/file3"), Path("/path/to/file4")},
}
>>> task = track_file_creation_progress(paths_dict)