Skip to content

Environment

pop_arg(arg_name, default_value=None)

Get the specified args and remove them from argv

Source code in src/super_gradients/common/environment/argparse_utils.py
12
13
14
15
16
17
18
19
20
21
22
23
def pop_arg(arg_name: str, default_value: Any = None) -> Any:
    """Get the specified args and remove them from argv"""

    parser = argparse.ArgumentParser()
    parser.add_argument(f"--{arg_name}", default=default_value)
    args, _ = parser.parse_known_args()

    # Remove the ddp args to not have a conflict with the use of hydra
    for val in filter(lambda x: x.startswith(f"--{arg_name}"), sys.argv):
        EXTRA_ARGS.append(val)
        sys.argv.remove(val)
    return vars(args)[arg_name]

pop_local_rank()

Pop the python arg "local-rank". If exists inform the user with a log, otherwise return -1.

Source code in src/super_gradients/common/environment/argparse_utils.py
26
27
28
29
30
31
def pop_local_rank() -> int:
    """Pop the python arg "local-rank". If exists inform the user with a log, otherwise return -1."""
    local_rank = pop_arg("local_rank", default_value=-1)
    if local_rank != -1:
        logger.info("local_rank was automatically parsed from your config.")
    return local_rank

add_params_to_cfg(cfg, params)

Add parameters to an existing config

Parameters:

Name Type Description Default
cfg DictConfig

OmegaConf config

required
params List[str]

List of parameters to add, in dotlist format (i.e. ["training_hyperparams.resume=True"])

required
Source code in src/super_gradients/common/environment/cfg_utils.py
 94
 95
 96
 97
 98
 99
100
def add_params_to_cfg(cfg: DictConfig, params: List[str]):
    """Add parameters to an existing config

    :param cfg:     OmegaConf config
    :param params:  List of parameters to add, in dotlist format (i.e. ["training_hyperparams.resume=True"])"""
    new_cfg = OmegaConf.from_dotlist(params)
    override_cfg(cfg, new_cfg)

export_recipe(config_name, save_path, config_dir=pkg_resources.resource_filename('super_gradients.recipes', ''))

saves a complete (i.e no inheritance from other yaml configuration files), .yaml file that can be ran on its own without the need to keep other configurations which the original file inherits from.

Parameters:

Name Type Description Default
config_name str

The .yaml config filename (can leave the .yaml postfix out, but not mandatory).

required
save_path str

The config directory path, as absolute file system path. When None, will use SG's recipe directory (i.e path/to/super_gradients/recipes)

required
config_dir str

The config directory path, as absolute file system path. When None, will use SG's recipe directory (i.e path/to/super_gradients/recipes)

pkg_resources.resource_filename('super_gradients.recipes', '')
Source code in src/super_gradients/common/environment/cfg_utils.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def export_recipe(config_name: str, save_path: str, config_dir: str = pkg_resources.resource_filename("super_gradients.recipes", "")):
    """
    saves a complete (i.e no inheritance from other yaml configuration files),
     .yaml file that can be ran on its own without the need to keep other configurations which the original
      file inherits from.

    :param config_name: The .yaml config filename (can leave the .yaml postfix out, but not mandatory).

    :param save_path: The config directory path, as absolute file system path.
        When None, will use SG's recipe directory (i.e path/to/super_gradients/recipes)

    :param config_dir: The config directory path, as absolute file system path.
        When None, will use SG's recipe directory (i.e path/to/super_gradients/recipes)

    """
    # NEED TO REGISTER RESOLVERS FIRST
    register_hydra_resolvers()
    GlobalHydra.instance().clear()
    with initialize_config_dir(config_dir=normalize_path(config_dir), version_base="1.2"):
        cfg = compose(config_name=config_name)
        OmegaConf.save(config=cfg, f=save_path)
        logger.info(f"Successfully saved recipe at {save_path}. \n" f"Recipe content:\n {cfg}")

load_arch_params(config_name, recipes_dir_path=None, overrides=None)

Load a single arch_params file.

Parameters:

Name Type Description Default
config_name str

Name of the yaml to load (e.g. "resnet18_cifar_arch_params")

required
recipes_dir_path Optional[str]

Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes) This directory should include a "arch_params" folder, which itself should include the config file named after config_name.

None
overrides Optional[list]

List of hydra overrides for config file

None
Source code in src/super_gradients/common/environment/cfg_utils.py
135
136
137
138
139
140
141
142
143
def load_arch_params(config_name: str, recipes_dir_path: Optional[str] = None, overrides: Optional[list] = None) -> DictConfig:
    """Load a single arch_params file.
    :param config_name:         Name of the yaml to load (e.g. "resnet18_cifar_arch_params")
    :param recipes_dir_path:    Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes)
                                This directory should include a "arch_params" folder,
                                which itself should include the config file named after config_name.
    :param overrides:           List of hydra overrides for config file
    """
    return load_recipe_from_subconfig(config_name=config_name, recipes_dir_path=recipes_dir_path, overrides=overrides, config_type="arch_params")

load_dataset_params(config_name, recipes_dir_path=None, overrides=None)

Load a single dataset_params file.

Parameters:

Name Type Description Default
config_name str

Name of the yaml to load (e.g. "cifar10_dataset_params")

required
recipes_dir_path Optional[str]

Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes) This directory should include a "training_hyperparams" folder, which itself should include the config file named after config_name.

None
overrides Optional[list]

List of hydra overrides for config file

None
Source code in src/super_gradients/common/environment/cfg_utils.py
157
158
159
160
161
162
163
164
165
def load_dataset_params(config_name: str, recipes_dir_path: Optional[str] = None, overrides: Optional[list] = None) -> DictConfig:
    """Load a single dataset_params file.
    :param config_name:         Name of the yaml to load (e.g. "cifar10_dataset_params")
    :param recipes_dir_path:    Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes)
                                This directory should include a "training_hyperparams" folder,
                                which itself should include the config file named after config_name.
    :param overrides:           List of hydra overrides for config file
    """
    return load_recipe_from_subconfig(config_name=config_name, recipes_dir_path=recipes_dir_path, overrides=overrides, config_type="dataset_params")

load_experiment_cfg(experiment_name, ckpt_root_dir=None, run_id=None)

Load the hydra config associated to a specific experiment.

Background Information: every time an experiment is launched based on a recipe, all the hydra config params are stored in a hidden folder ".hydra". This hidden folder is used here to recreate the exact same config as the one that was used to launch the experiment (Also include hydra overrides).

The motivation is to be able to resume or evaluate an experiment with the exact same config as the one that was used when the experiment was initially started, regardless of any change that might have been introduced to the recipe, and also while using the same overrides that were used for that experiment.

Parameters:

Name Type Description Default
experiment_name str

Name of the experiment to resume

required
ckpt_root_dir Optional[str]

Directory including the checkpoints

None
run_id Optional[str]

Optional. Run id of the experiment. If None, the most recent run will be loaded.

None

Returns:

Type Description
DictConfig

The config that was used for that experiment

Source code in src/super_gradients/common/environment/cfg_utils.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def load_experiment_cfg(experiment_name: str, ckpt_root_dir: Optional[str] = None, run_id: Optional[str] = None) -> DictConfig:
    """
    Load the hydra config associated to a specific experiment.

    Background Information: every time an experiment is launched based on a recipe, all the hydra config params are stored in a hidden folder ".hydra".
    This hidden folder is used here to recreate the exact same config as the one that was used to launch the experiment (Also include hydra overrides).

    The motivation is to be able to resume or evaluate an experiment with the exact same config as the one that was used when the experiment was
    initially started, regardless of any change that might have been introduced to the recipe, and also while using the same overrides that were used
    for that experiment.

    :param experiment_name:     Name of the experiment to resume
    :param ckpt_root_dir:       Directory including the checkpoints
    :param run_id:              Optional. Run id of the experiment. If None, the most recent run will be loaded.
    :return:                    The config that was used for that experiment
    """
    if not experiment_name:
        raise ValueError(f"experiment_name should be non empty string but got :{experiment_name}")

    checkpoints_dir_path = Path(get_checkpoints_dir_path(ckpt_root_dir=ckpt_root_dir, experiment_name=experiment_name, run_id=run_id))
    if not checkpoints_dir_path.exists():
        raise FileNotFoundError(f"Impossible to find checkpoint dir ({checkpoints_dir_path})")

    resume_dir = Path(checkpoints_dir_path) / ".hydra"
    if not resume_dir.exists():
        raise FileNotFoundError(f"The checkpoint directory {checkpoints_dir_path} does not include .hydra artifacts to resume the experiment.")

    # Load overrides that were used in previous run
    overrides_cfg = list(OmegaConf.load(resume_dir / "overrides.yaml"))

    cfg = load_recipe(config_name="config.yaml", recipes_dir_path=normalize_path(str(resume_dir)), overrides=overrides_cfg)
    return cfg

load_recipe(config_name, recipes_dir_path=None, overrides=None)

Load a single a file of the recipe directory.

Parameters:

Name Type Description Default
config_name str

Name of the yaml to load (e.g. "cifar10_resnet")

required
recipes_dir_path Optional[str]

Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes) This directory should include a folder corresponding to the subconfig, which itself should include the config file named after config_name.

None
overrides Optional[list]

List of hydra overrides for config file

None
Source code in src/super_gradients/common/environment/cfg_utils.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def load_recipe(config_name: str, recipes_dir_path: Optional[str] = None, overrides: Optional[list] = None) -> DictConfig:
    """Load a single a file of the recipe directory.

    :param config_name:         Name of the yaml to load (e.g. "cifar10_resnet")
    :param recipes_dir_path:    Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes)
                                This directory should include a folder corresponding to the subconfig, which itself should
                                include the config file named after config_name.
    :param overrides:           List of hydra overrides for config file
    """
    GlobalHydra.instance().clear()

    config_dir = recipes_dir_path or pkg_resources.resource_filename("super_gradients.recipes", "")

    with initialize_config_dir(config_dir=normalize_path(config_dir), version_base="1.2"):
        try:
            cfg = compose(config_name=normalize_path(config_name), overrides=overrides if overrides else [])
        except hydra.errors.MissingConfigException:
            raise RecipeNotFoundError(config_name=config_name, config_dir=config_dir, recipes_dir_path=recipes_dir_path)
    return cfg

load_recipe_from_subconfig(config_name, config_type, recipes_dir_path=None, overrides=None)

Load a single a file (e.g. "resnet18_cifar_arch_params") stored in a subconfig (e.g. "arch_param") of the recipe directory,.

Parameters:

Name Type Description Default
config_name str

Name of the yaml to load (e.g. "resnet18_cifar_arch_params")

required
config_type str

Type of the subconfig (e.g. "arch_params")

required
recipes_dir_path Optional[str]

Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes) This directory should include a folder corresponding to the subconfig, which itself should include the config file named after config_name.

None
overrides Optional[list]

List of hydra overrides for config file

None
Source code in src/super_gradients/common/environment/cfg_utils.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def load_recipe_from_subconfig(config_name: str, config_type: str, recipes_dir_path: Optional[str] = None, overrides: Optional[list] = None) -> DictConfig:
    """Load a single a file (e.g. "resnet18_cifar_arch_params") stored in a subconfig (e.g. "arch_param") of the recipe directory,.

    :param config_name:         Name of the yaml to load (e.g. "resnet18_cifar_arch_params")
    :param config_type:         Type of the subconfig (e.g. "arch_params")
    :param recipes_dir_path:    Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes)
                                This directory should include a folder corresponding to the subconfig,
                                which itself should include the config file named after config_name.
    :param overrides:           List of hydra overrides for config file
    """

    try:
        cfg = load_recipe(config_name=os.path.join(config_type, config_name), recipes_dir_path=recipes_dir_path, overrides=overrides)
    except RecipeNotFoundError as e:
        postfix_err_msg = (
            f"Note: If your recipe is saved at '{os.path.join(e.config_dir, config_name.replace('.yaml', ''))}.yaml', you can load it with load_recipe(...).\n"
        )

        raise RecipeNotFoundError(
            config_name=config_name,
            config_dir=e.config_dir,
            config_type=config_type,
            recipes_dir_path=recipes_dir_path,
            postfix_err_msg=postfix_err_msg,
        )

    # Because of the way we load the subconfig, cfg will start with a single key corresponding to the type (arch_params, ...) and don't want that.
    cfg = cfg[config_type]

    return cfg

load_training_hyperparams(config_name, recipes_dir_path=None, overrides=None)

Load a single training_hyperparams file.

Parameters:

Name Type Description Default
config_name str

Name of the yaml to load (e.g. "cifar10_resnet_train_params")

required
recipes_dir_path Optional[str]

Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes) This directory should include a "training_hyperparams" folder, which itself should include the config file named after config_name.

None
overrides Optional[list]

List of hydra overrides for config file

None
Source code in src/super_gradients/common/environment/cfg_utils.py
146
147
148
149
150
151
152
153
154
def load_training_hyperparams(config_name: str, recipes_dir_path: Optional[str] = None, overrides: Optional[list] = None) -> DictConfig:
    """Load a single training_hyperparams file.
    :param config_name:         Name of the yaml to load (e.g. "cifar10_resnet_train_params")
    :param recipes_dir_path:    Optional. Main directory where every recipe are stored. (e.g. ../super_gradients/recipes)
                                This directory should include a "training_hyperparams" folder,
                                which itself should include the config file named after config_name.
    :param overrides:           List of hydra overrides for config file
    """
    return load_recipe_from_subconfig(config_name=config_name, recipes_dir_path=recipes_dir_path, overrides=overrides, config_type="training_hyperparams")

maybe_instantiate_test_loaders(cfg)

Instantiate test loaders if they are defined in the config.

Parameters:

Name Type Description Default
cfg

Recipe config

required

Returns:

Type Description
Optional[Mapping[str, DataLoader]]

A mapping from dataset name to test loader or None if no test loaders are defined.

Source code in src/super_gradients/common/environment/cfg_utils.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def maybe_instantiate_test_loaders(cfg) -> Optional[Mapping[str, DataLoader]]:
    """
    Instantiate test loaders if they are defined in the config.

    :param cfg: Recipe config
    :return:    A mapping from dataset name to test loader or None if no test loaders are defined.
    """
    from super_gradients.training.utils.utils import get_param
    from super_gradients.training import dataloaders

    test_loaders = None
    if "test_dataset_params" in cfg.dataset_params:
        test_dataloaders = get_param(cfg, "test_dataloaders")
        test_dataset_params = cfg.dataset_params.test_dataset_params
        test_dataloader_params = get_param(cfg.dataset_params, "test_dataloader_params")

        if test_dataloaders is not None:
            if not isinstance(test_dataloaders, Mapping):
                raise ValueError("`test_dataloaders` should be a mapping from test_loader_name to test_loader_params.")

            if test_dataloader_params is not None and test_dataloader_params.keys() != test_dataset_params.keys():
                raise ValueError("test_dataloader_params and test_dataset_params should have the same keys.")

        test_loaders = {}
        for dataset_name, dataset_params in test_dataset_params.items():
            loader_name = test_dataloaders[dataset_name] if test_dataloaders is not None else None
            dataset_params = test_dataset_params[dataset_name]
            dataloader_params = test_dataloader_params[dataset_name] if test_dataloader_params is not None else cfg.dataset_params.val_dataloader_params
            loader = dataloaders.get(loader_name, dataset_params=dataset_params, dataloader_params=dataloader_params)
            test_loaders[dataset_name] = loader

    return test_loaders

override_cfg(cfg, overrides)

Override inplace a config with a list of hydra overrides

Parameters:

Name Type Description Default
cfg DictConfig

OmegaConf config

required
overrides Union[DictConfig, Dict[str, Any]]

Dictionary like object that will be used to override cfg

required
Source code in src/super_gradients/common/environment/cfg_utils.py
168
169
170
171
172
173
174
def override_cfg(cfg: DictConfig, overrides: Union[DictConfig, Dict[str, Any]]) -> None:
    """Override inplace a config with a list of hydra overrides
    :param cfg:         OmegaConf config
    :param overrides:   Dictionary like object that will be used to override cfg
    """
    with open_dict(cfg):  # This is required to add new fields to existing config
        cfg.merge_with(overrides)

generate_run_id()

Generate a unique run ID based on the current timestamp.

Returns:

Type Description
str

Unique run ID. in the format "RUN_" (E.g. "RUN_20230802_131052_651906")

Source code in src/super_gradients/common/environment/checkpoints_dir_utils.py
20
21
22
23
24
25
26
@execute_and_distribute_from_master
def generate_run_id() -> str:
    """Generate a unique run ID based on the current timestamp.

    :return: Unique run ID. in the format "RUN_<year><month><day>_<hour><minute><second>_<microseconds>" (E.g. "RUN_20230802_131052_651906")
    """
    return datetime.now().strftime("RUN_%Y%m%d_%H%M%S_%f")

get_checkpoints_dir_path(experiment_name, ckpt_root_dir=None, run_id=None)

Get the directory that includes all the checkpoints (and logs) of an experiment. ckpt_root_dir - experiment_name - run_id - ... - ...

Parameters:

Name Type Description Default
experiment_name str

Name of the experiment.

required
ckpt_root_dir Optional[str]

Path to the directory where all the experiments are organised, each sub-folder representing a specific experiment. If None, SG will first check if a package named 'checkpoints' exists. If not, SG will look for the root of the project that includes the script that was launched. If not found, raise an error.

None
run_id Optional[str]

Optional. Run id of the experiment. If None, the most recent run will be loaded.

None

Returns:

Type Description
str

Path of folder where the experiment checkpoints and logs will be stored.

Source code in src/super_gradients/common/environment/checkpoints_dir_utils.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_checkpoints_dir_path(experiment_name: str, ckpt_root_dir: Optional[str] = None, run_id: Optional[str] = None) -> str:
    """Get the directory that includes all the checkpoints (and logs) of an experiment.
    ckpt_root_dir
        - experiment_name
            - run_id
                - ...
                - ...

    :param experiment_name:     Name of the experiment.
    :param ckpt_root_dir:       Path to the directory where all the experiments are organised, each sub-folder representing a specific experiment.
                                    If None, SG will first check if a package named 'checkpoints' exists.
                                    If not, SG will look for the root of the project that includes the script that was launched.
                                    If not found, raise an error.
    :param run_id:              Optional. Run id of the experiment. If None, the most recent run will be loaded.
    :return:                    Path of folder where the experiment checkpoints and logs will be stored.
    """
    experiment_dir = get_experiment_dir_path(checkpoints_root_dir=ckpt_root_dir, experiment_name=experiment_name)
    checkpoint_dir = experiment_dir if run_id is None else os.path.join(experiment_dir, run_id)
    os.makedirs(checkpoint_dir, exist_ok=True)
    return checkpoint_dir

get_ckpt_local_path(experiment_name, ckpt_name, external_checkpoint_path, ckpt_root_dir=None, run_id=None)

Gets the local path to the checkpoint file, which will be: - By default: YOUR_REPO_ROOT/super_gradients/checkpoints/experiment_name/ckpt_name. - external_checkpoint_path when external_checkpoint_path != None - ckpt_root_dir/experiment_name/ckpt_name when ckpt_root_dir != None. - if the checkpoint file is remotely located: when overwrite_local_checkpoint=True then it will be saved in a temporary path which will be returned, otherwise it will be downloaded to YOUR_REPO_ROOT/super_gradients/checkpoints/experiment_name and overwrite YOUR_REPO_ROOT/super_gradients/checkpoints/experiment_name/ckpt_name if such file exists.

Parameters:

Name Type Description Default
experiment_name str

Name of the experiment.

required
ckpt_name str

Checkpoint filename

required
external_checkpoint_path str

Full path to checkpoint file (that might be located outside of super_gradients/checkpoints directory)

required
ckpt_root_dir str

Path to the directory where all the experiments are organised, each sub-folder representing a specific experiment. If None, SG will first check if a package named 'checkpoints' exists. If not, SG will look for the root of the project that includes the script that was launched. If not found, raise an error.

None
run_id Optional[str]

Optional. Run id of the experiment. If None, the most recent run will be loaded.

None

Returns:

Type Description
str

Path of folder where the experiment checkpoints and logs will be stored. :return: local path of the checkpoint file (Str)

Source code in src/super_gradients/common/environment/checkpoints_dir_utils.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def get_ckpt_local_path(experiment_name: str, ckpt_name: str, external_checkpoint_path: str, ckpt_root_dir: str = None, run_id: Optional[str] = None) -> str:
    """
    Gets the local path to the checkpoint file, which will be:
        - By default: YOUR_REPO_ROOT/super_gradients/checkpoints/experiment_name/ckpt_name.
        - external_checkpoint_path when external_checkpoint_path != None
        - ckpt_root_dir/experiment_name/ckpt_name when ckpt_root_dir != None.
        - if the checkpoint file is remotely located:
            when overwrite_local_checkpoint=True then it will be saved in a temporary path which will be returned,
            otherwise it will be downloaded to YOUR_REPO_ROOT/super_gradients/checkpoints/experiment_name and overwrite
            YOUR_REPO_ROOT/super_gradients/checkpoints/experiment_name/ckpt_name if such file exists.


    :param experiment_name:         Name of the experiment.
    :param ckpt_name:               Checkpoint filename
    :param external_checkpoint_path: Full path to checkpoint file (that might be located outside of super_gradients/checkpoints directory)
    :param ckpt_root_dir:           Path to the directory where all the experiments are organised, each sub-folder representing a specific experiment.
                                        If None, SG will first check if a package named 'checkpoints' exists.
                                        If not, SG will look for the root of the project that includes the script that was launched.
                                        If not found, raise an error.
    :param run_id:                  Optional. Run id of the experiment. If None, the most recent run will be loaded.
    :return:                        Path of folder where the experiment checkpoints and logs will be stored.
     :return: local path of the checkpoint file (Str)
    """
    if external_checkpoint_path:
        return external_checkpoint_path
    else:
        checkpoints_dir_path = get_checkpoints_dir_path(ckpt_root_dir=ckpt_root_dir, experiment_name=experiment_name, run_id=run_id)
        return os.path.join(checkpoints_dir_path, ckpt_name)

get_latest_run_id(experiment_name, checkpoints_root_dir=None)

Parameters:

Name Type Description Default
experiment_name str

Name of the experiment.

required
checkpoints_root_dir Optional[str]

Path to the directory where all the experiments are organised, each sub-folder representing a specific experiment. If None, SG will first check if a package named 'checkpoints' exists. If not, SG will look for the root of the project that includes the script that was launched. If not found, raise an error.

None

Returns:

Type Description
Optional[str]

Latest valid run ID. in the format "RUN_"

Source code in src/super_gradients/common/environment/checkpoints_dir_utils.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def get_latest_run_id(experiment_name: str, checkpoints_root_dir: Optional[str] = None) -> Optional[str]:
    """
    :param experiment_name:         Name of the experiment.
    :param checkpoints_root_dir:    Path to the directory where all the experiments are organised, each sub-folder representing a specific experiment.
                                    If None, SG will first check if a package named 'checkpoints' exists.
                                    If not, SG will look for the root of the project that includes the script that was launched.
                                    If not found, raise an error.
    :return:                        Latest valid run ID. in the format "RUN_<year>"
    """
    experiment_dir = get_experiment_dir_path(checkpoints_root_dir=checkpoints_root_dir, experiment_name=experiment_name)

    run_dirs = [os.path.join(experiment_dir, folder) for folder in os.listdir(experiment_dir) if is_run_dir(folder)]
    for run_dir in sorted(run_dirs, reverse=True):
        if "ckpt_latest.pth" not in os.listdir(run_dir):
            logger.warning(
                f"Latest run directory {run_dir} does not contain a `ckpt_latest.pth` file, so it cannot be resumed. "
                f"Trying to load the n-1 most recent run..."
            )
        else:
            return os.path.basename(run_dir)

get_project_checkpoints_dir_path()

Get the checkpoints' directory that is at the root of the users project. Create it if it doesn't exist. Return None if root not found.

Source code in src/super_gradients/common/environment/checkpoints_dir_utils.py
83
84
85
86
87
88
89
90
91
92
93
def get_project_checkpoints_dir_path() -> Optional[str]:
    """Get the checkpoints' directory that is at the root of the users project. Create it if it doesn't exist. Return None if root not found."""
    project_root_path = _get_project_root_path()
    if project_root_path is None:
        return None

    checkpoints_path = os.path.join(project_root_path, "checkpoints")
    if not os.path.exists(checkpoints_path):
        os.makedirs(checkpoints_path, exist_ok=True)
        logger.info(f'A checkpoints directory was just created at "{checkpoints_path}". To work with another directory, please set "ckpt_root_dir"')
    return checkpoints_path

is_run_dir(dirname)

Check if a directory is a run directory.

Parameters:

Name Type Description Default
dirname str

Directory name.

required

Returns:

Type Description
bool

True if the directory is a run directory, False otherwise.

Source code in src/super_gradients/common/environment/checkpoints_dir_utils.py
29
30
31
32
33
34
35
def is_run_dir(dirname: str) -> bool:
    """Check if a directory is a run directory.

    :param dirname: Directory name.
    :return:        True if the directory is a run directory, False otherwise.
    """
    return os.path.basename(dirname).startswith("RUN_")

broadcast_from_master(data)

Broadcast data from master node to all other nodes. This may be required when you want to compute something only on master node (e.g computational-heavy metric) and don't want to waste CPU of other nodes doing the same work simultaneously.

Parameters:

Name Type Description Default
data Any

Data to be broadcasted from master node (rank 0)

required

Returns:

Type Description
Any

Data from rank 0 node

Source code in src/super_gradients/common/environment/ddp_utils.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def broadcast_from_master(data: Any) -> Any:
    """
    Broadcast data from master node to all other nodes. This may be required when you
    want to compute something only on master node (e.g computational-heavy metric) and
    don't want to waste CPU of other nodes doing the same work simultaneously.

    :param data:    Data to be broadcasted from master node (rank 0)
    :return:        Data from rank 0 node
    """
    world_size = get_world_size()
    if world_size == 1:
        return data
    broadcast_list = [data] if dist.get_rank() == 0 else [None]
    dist.broadcast_object_list(broadcast_list, src=0)
    return broadcast_list[0]

execute_and_distribute_from_master(func)

Decorator to execute a function on the master process and distribute the result to all other processes. Useful in parallel computing scenarios where a computational task needs to be performed only on the master node (e.g., a computational-heavy calculation), and the result must be shared with other nodes without redundant computation.

Example usage: >>> @execute_and_distribute_from_master >>> def some_code_to_run(param1, param2): >>> return param1 + param2

The wrapped function will only be executed on the master node, and the result will be propagated to all other nodes.

Parameters:

Name Type Description Default
func Callable[..., Any]

The function to be executed on the master process and whose result is to be distributed.

required

Returns:

Type Description
Callable[..., Any]

A wrapper function that encapsulates the execute-and-distribute logic.

Source code in src/super_gradients/common/environment/ddp_utils.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def execute_and_distribute_from_master(func: Callable[..., Any]) -> Callable[..., Any]:
    """
    Decorator to execute a function on the master process and distribute the result to all other processes.
    Useful in parallel computing scenarios where a computational task needs to be performed only on the master
    node (e.g., a computational-heavy calculation), and the result must be shared with other nodes without
    redundant computation.

    Example usage:
        >>> @execute_and_distribute_from_master
        >>> def some_code_to_run(param1, param2):
        >>>     return param1 + param2

    The wrapped function will only be executed on the master node, and the result will be propagated to all
    other nodes.

    :param func:    The function to be executed on the master process and whose result is to be distributed.
    :return:        A wrapper function that encapsulates the execute-and-distribute logic.
    """

    @wraps(func)
    def wrapper(*args, **kwargs):
        # Run the function only if it's the master process
        if device_config.assigned_rank <= 0:
            result = func(*args, **kwargs)
        else:
            result = None

        # Broadcast the result from the master process to all nodes
        return broadcast_from_master(result)

    return wrapper

find_free_port()

Find an available port of current machine/node. Note: there is still a chance the port could be taken by other processes.

Source code in src/super_gradients/common/environment/ddp_utils.py
75
76
77
78
79
80
81
82
83
def find_free_port() -> int:
    """Find an available port of current machine/node.
    Note: there is still a chance the port could be taken by other processes."""

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        # Binding to port 0 will cause the OS to find an available port for us
        sock.bind(("", 0))
        _ip, port = sock.getsockname()
    return port

get_local_rank()

Returns the local rank if running in DDP, and 0 otherwise

Returns:

Type Description

local rank

Source code in src/super_gradients/common/environment/ddp_utils.py
86
87
88
89
90
91
def get_local_rank():
    """
    Returns the local rank if running in DDP, and 0 otherwise
    :return: local rank
    """
    return dist.get_rank() if dist.is_initialized() else 0

get_world_size()

Returns the world size if running in DDP, and 1 otherwise

Returns:

Type Description
int

world size

Source code in src/super_gradients/common/environment/ddp_utils.py
104
105
106
107
108
109
110
111
112
113
def get_world_size() -> int:
    """
    Returns the world size if running in DDP, and 1 otherwise
    :return: world size
    """
    if not dist.is_available():
        return 1
    if not dist.is_initialized():
        return 1
    return dist.get_world_size()

init_trainer()

Initialize the super_gradients environment.

This function should be the first thing to be called by any code running super_gradients.

Source code in src/super_gradients/common/environment/ddp_utils.py
14
15
16
17
18
19
20
21
def init_trainer():
    """
    Initialize the super_gradients environment.

    This function should be the first thing to be called by any code running super_gradients.
    """
    register_hydra_resolvers()
    pop_local_rank()

is_distributed()

Check if current process is a DDP subprocess.

Source code in src/super_gradients/common/environment/ddp_utils.py
24
25
26
def is_distributed() -> bool:
    """Check if current process is a DDP subprocess."""
    return device_config.assigned_rank >= 0

is_launched_using_sg()

Check if the current process is a subprocess launched using SG restart_script_with_ddp

Source code in src/super_gradients/common/environment/ddp_utils.py
29
30
31
def is_launched_using_sg():
    """Check if the current process is a subprocess launched using SG restart_script_with_ddp"""
    return os.environ.get("TORCHELASTIC_RUN_ID") == "sg_initiated"

is_main_process()

Check if current process is considered as the main process (i.e. is responsible for sanity check, atexit upload, ...). The definition ensures that 1 and only 1 process follows this condition, regardless of how the run was started.

The rule is as follow: - If not DDP: main process is current process - If DDP launched using SuperGradients: main process is the launching process (rank=-1) - If DDP launched with torch: main process is rank 0

Source code in src/super_gradients/common/environment/ddp_utils.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def is_main_process():
    """Check if current process is considered as the main process (i.e. is responsible for sanity check, atexit upload, ...).
    The definition ensures that 1 and only 1 process follows this condition, regardless of how the run was started.

    The rule is as follow:
        - If not DDP: main process is current process
        - If DDP launched using SuperGradients: main process is the launching process (rank=-1)
        - If DDP launched with torch: main process is rank 0
    """

    if not is_distributed():  # If no DDP, or DDP launching process
        return True
    elif (
        device_config.assigned_rank == 0 and not is_launched_using_sg()
    ):  # If DDP launched using torch.distributed.launch or torchrun, we need to run the check on rank 0
        return True
    else:
        return False

multi_process_safe(func)

A decorator for making sure a function runs only in main process. If not in DDP mode (local_rank = -1), the function will run. If in DDP mode, the function will run only in the main process (local_rank = 0) This works only for functions with no return value

Source code in src/super_gradients/common/environment/ddp_utils.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def multi_process_safe(func):
    """
    A decorator for making sure a function runs only in main process.
    If not in DDP mode (local_rank = -1), the function will run.
    If in DDP mode, the function will run only in the main process (local_rank = 0)
    This works only for functions with no return value
    """

    def do_nothing(*args, **kwargs):
        pass

    @wraps(func)
    def wrapper(*args, **kwargs):
        if device_config.assigned_rank <= 0:
            return func(*args, **kwargs)
        else:
            return do_nothing(*args, **kwargs)

    return wrapper

EnvironmentVariables

Class to dynamically get any environment variables.

Source code in src/super_gradients/common/environment/env_variables.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class EnvironmentVariables:
    """Class to dynamically get any environment variables."""

    # Infra

    @property
    def WANDB_BASE_URL(self) -> str:
        return os.getenv("WANDB_BASE_URL")

    @property
    def AWS_PROFILE(self) -> str:
        return os.getenv("AWS_PROFILE")

    # DDP
    @property
    def LOCAL_RANK(self) -> int:
        return int(os.getenv("LOCAL_RANK", -1))

    # Turn ON/OFF features
    @property
    def CRASH_HANDLER(self) -> str:
        return os.getenv("CRASH_HANDLER", "TRUE")

    @property
    def UPLOAD_LOGS(self) -> bool:
        return os.getenv("UPLOAD_LOGS", "TRUE") == "TRUE"

    @property
    def FILE_LOG_LEVEL(self) -> str:
        return os.getenv("FILE_LOG_LEVEL", default="DEBUG").upper()

    @property
    def CONSOLE_LOG_LEVEL(self) -> str:
        return os.getenv("CONSOLE_LOG_LEVEL", default="INFO").upper()

    @property
    def HYDRA_FULL_ERROR(self) -> Optional[str]:
        return os.getenv("HYDRA_FULL_ERROR")

    @HYDRA_FULL_ERROR.setter
    def HYDRA_FULL_ERROR(self, value: str):
        os.environ["HYDRA_FULL_ERROR"] = value

    @property
    def SUPER_GRADIENTS_LOG_DIR(self) -> str:
        return os.getenv("SUPER_GRADIENTS_LOG_DIR", default=str(Path.home() / "sg_logs"))

get_cpu_percent()

Average of all the CPU utilization.

Source code in src/super_gradients/common/environment/monitoring/cpu.py
4
5
6
def get_cpu_percent() -> float:
    """Average of all the CPU utilization."""
    return psutil.cpu_percent(interval=None, percpu=False)

GPUStatAggregatorIterator dataclass

Iterator of multiple StatAggregator, that accumulate samples and aggregates them for each NVIDIA device.

Parameters:

Name Type Description Default
name str

Name of the statistic

required
sampling_fn

How the statistic is sampled

required
aggregate_fn

How the statistic samples are aggregated

required
Source code in src/super_gradients/common/environment/monitoring/data_models.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@dataclasses.dataclass
class GPUStatAggregatorIterator:
    """Iterator of multiple StatAggregator, that accumulate samples and aggregates them for each NVIDIA device.

    :param name:            Name of the statistic
    :param sampling_fn:     How the statistic is sampled
    :param aggregate_fn:    How the statistic samples are aggregated
    """

    name: str
    device_sampling_fn: Callable
    device_aggregate_fn: Callable
    _per_device_stat_aggregator: List[StatAggregator] = dataclasses.field(init=False)

    def __post_init__(self):
        """Initialize nvidia_management_lib and create a list of StatAggregator, one for each NVIDIA device."""
        init_nvidia_management_lib()
        self._per_device_stat_aggregator = [
            StatAggregator(name=f"{self.name}/device_{i}", sampling_fn=partial(self.device_sampling_fn, i), aggregate_fn=self.device_aggregate_fn)
            for i in range(count_gpus())
        ]

    def __iter__(self) -> Iterator[StatAggregator]:
        """Iterate over the StatAggregator of each node"""
        return iter(self._per_device_stat_aggregator)

__iter__()

Iterate over the StatAggregator of each node

Source code in src/super_gradients/common/environment/monitoring/data_models.py
66
67
68
def __iter__(self) -> Iterator[StatAggregator]:
    """Iterate over the StatAggregator of each node"""
    return iter(self._per_device_stat_aggregator)

__post_init__()

Initialize nvidia_management_lib and create a list of StatAggregator, one for each NVIDIA device.

Source code in src/super_gradients/common/environment/monitoring/data_models.py
58
59
60
61
62
63
64
def __post_init__(self):
    """Initialize nvidia_management_lib and create a list of StatAggregator, one for each NVIDIA device."""
    init_nvidia_management_lib()
    self._per_device_stat_aggregator = [
        StatAggregator(name=f"{self.name}/device_{i}", sampling_fn=partial(self.device_sampling_fn, i), aggregate_fn=self.device_aggregate_fn)
        for i in range(count_gpus())
    ]

StatAggregator dataclass

Accumulate statistics samples and aggregates them.

Parameters:

Name Type Description Default
name str

Name of the statistic

required
sampling_fn Callable

How the statistic is sampled

required
aggregate_fn Callable[[List[Any], float], float]

How the statistic samples are aggregated, has to take "samples: List[Any]" and "time: float" as parameters

required
reset_callback_fn Optional[Callable]

Optional, can be used to reset any system metric

None
Source code in src/super_gradients/common/environment/monitoring/data_models.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@dataclasses.dataclass
class StatAggregator:
    """Accumulate statistics samples and aggregates them.

    :param name:                Name of the statistic
    :param sampling_fn:         How the statistic is sampled
    :param aggregate_fn:        How the statistic samples are aggregated, has to take "samples: List[Any]" and "time: float" as parameters
    :param reset_callback_fn:   Optional, can be used to reset any system metric
    """

    name: str
    sampling_fn: Callable
    aggregate_fn: Callable[[List[Any], float], float]
    reset_callback_fn: Optional[Callable] = None
    _samples: List = dataclasses.field(default_factory=list)
    _reset_time: float = None

    def sample(self):
        try:
            self._samples.append(self.sampling_fn())
        except Exception:
            pass

    def aggregate(self) -> Union[float, None]:
        if len(self._samples) > 0:
            time_diff = time.time() - self._reset_time
            return self.aggregate_fn(self._samples, time_diff)

    def reset(self):
        self._samples = []
        self._reset_time = time.time()
        if self.reset_callback_fn:
            self.reset_callback_fn()

get_disk_usage_percent()

Disk memory used in percent.

Source code in src/super_gradients/common/environment/monitoring/disk.py
 9
10
11
def get_disk_usage_percent() -> float:
    """Disk memory used in percent."""
    return psutil.disk_usage("/").percent

get_io_read_mb()

Number of MegaBytes read since import

Source code in src/super_gradients/common/environment/monitoring/disk.py
14
15
16
def get_io_read_mb() -> float:
    """Number of MegaBytes read since import"""
    return bytes_to_megabytes(psutil.disk_io_counters().read_bytes - buffer_io_read_bytes)

get_io_write_mb()

Number of MegaBytes written since import

Source code in src/super_gradients/common/environment/monitoring/disk.py
19
20
21
def get_io_write_mb() -> float:
    """Number of MegaBytes written since import"""
    return bytes_to_megabytes(psutil.disk_io_counters().write_bytes - buffer_io_write_bytes)

reset_io_read()

Reset the value of net_io_counters

Source code in src/super_gradients/common/environment/monitoring/disk.py
24
25
26
27
def reset_io_read():
    """Reset the value of net_io_counters"""
    global buffer_io_read_bytes
    buffer_io_read_bytes = psutil.disk_io_counters().read_bytes

reset_io_write()

Reset the value of net_io_counters

Source code in src/super_gradients/common/environment/monitoring/disk.py
30
31
32
33
def reset_io_write():
    """Reset the value of net_io_counters"""
    global buffer_io_write_bytes
    buffer_io_write_bytes = psutil.disk_io_counters().write_bytes

count_gpus()

Count how many GPUS NVDIA detects.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
18
19
20
def count_gpus() -> int:
    """Count how many GPUS NVDIA detects."""
    return pynvml.nvmlDeviceGetCount()

get_device_memory_allocated_percent(gpu_index)

GPU memory allocated in percent of a given GPU.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
34
35
36
37
38
def get_device_memory_allocated_percent(gpu_index: int) -> float:
    """GPU memory allocated in percent of a given GPU."""
    handle = get_handle_by_index(gpu_index)
    memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
    return memory_info.used / memory_info.total * 100

get_device_memory_usage_percent(gpu_index)

GPU memory utilization in percent of a given GPU.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
28
29
30
31
def get_device_memory_usage_percent(gpu_index: int) -> float:
    """GPU memory utilization in percent of a given GPU."""
    handle = get_handle_by_index(gpu_index)
    return pynvml.nvmlDeviceGetUtilizationRates(handle).memory

get_device_power_usage_percent(gpu_index)

GPU power usage in percent of a given GPU.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
59
60
61
62
63
64
def get_device_power_usage_percent(gpu_index: int) -> float:
    """GPU power usage in percent of a given GPU."""
    handle = get_handle_by_index(gpu_index)
    power_watts = pynvml.nvmlDeviceGetPowerUsage(handle)
    power_capacity_watts = pynvml.nvmlDeviceGetEnforcedPowerLimit(handle)
    return (power_watts / power_capacity_watts) * 100

get_device_power_usage_w(gpu_index)

GPU power usage in Watts of a given GPU.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
53
54
55
56
def get_device_power_usage_w(gpu_index: int) -> float:
    """GPU power usage in Watts of a given GPU."""
    handle = get_handle_by_index(gpu_index)
    return pynvml.nvmlDeviceGetPowerUsage(handle) / 1000  # Wats

get_device_temperature_c(gpu_index)

GPU temperature in Celsius of a given GPU.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
47
48
49
50
def get_device_temperature_c(gpu_index: int) -> float:
    """GPU temperature in Celsius of a given GPU."""
    handle = get_handle_by_index(gpu_index)
    return pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)

get_device_usage_percent(gpu_index)

GPU utilization in percent of a given GPU.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
41
42
43
44
def get_device_usage_percent(gpu_index: int) -> float:
    """GPU utilization in percent of a given GPU."""
    handle = get_handle_by_index(gpu_index)
    return pynvml.nvmlDeviceGetUtilizationRates(handle).gpu

get_handle_by_index(gpu_index)

Get the device handle of a given GPU.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
23
24
25
def get_handle_by_index(gpu_index: int):
    """Get the device handle of a given GPU."""
    return pynvml.nvmlDeviceGetHandleByIndex(gpu_index)

init_nvidia_management_lib()

Initialize nvml (NVDIA management library), which is required to use pynvml.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
13
14
15
def init_nvidia_management_lib():
    """Initialize nvml (NVDIA management library), which is required to use pynvml."""
    pynvml.nvmlInit()

safe_init_nvidia_management_lib()

Initialize nvml (NVDIA management library), which is required to use pynvml. Return True on success.

Source code in src/super_gradients/common/environment/monitoring/gpu/gpu.py
 4
 5
 6
 7
 8
 9
10
def safe_init_nvidia_management_lib() -> bool:
    """Initialize nvml (NVDIA management library), which is required to use pynvml. Return True on success."""
    try:
        init_nvidia_management_lib()
        return True
    except pynvml.NVMLError:
        return False

NVML_VALUE_NOT_AVAILABLE_uint = c_uint(-1) module-attribute

Field Identifiers.

All Identifiers pertain to a device. Each ID is only used once and is guaranteed never to change.

NVMLError

Bases: Exception

Source code in src/super_gradients/common/environment/monitoring/gpu/pynvml.py
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
class NVMLError(Exception):
    _valClassMapping = dict()
    # List of currently known error codes
    _errcode_to_string = {
        NVML_ERROR_UNINITIALIZED: "Uninitialized",
        NVML_ERROR_INVALID_ARGUMENT: "Invalid Argument",
        NVML_ERROR_NOT_SUPPORTED: "Not Supported",
        NVML_ERROR_NO_PERMISSION: "Insufficient Permissions",
        NVML_ERROR_ALREADY_INITIALIZED: "Already Initialized",
        NVML_ERROR_NOT_FOUND: "Not Found",
        NVML_ERROR_INSUFFICIENT_SIZE: "Insufficient Size",
        NVML_ERROR_INSUFFICIENT_POWER: "Insufficient External Power",
        NVML_ERROR_DRIVER_NOT_LOADED: "Driver Not Loaded",
        NVML_ERROR_TIMEOUT: "Timeout",
        NVML_ERROR_IRQ_ISSUE: "Interrupt Request Issue",
        NVML_ERROR_LIBRARY_NOT_FOUND: "NVML Shared Library Not Found",
        NVML_ERROR_FUNCTION_NOT_FOUND: "Function Not Found",
        NVML_ERROR_CORRUPTED_INFOROM: "Corrupted infoROM",
        NVML_ERROR_GPU_IS_LOST: "GPU is lost",
        NVML_ERROR_RESET_REQUIRED: "GPU requires restart",
        NVML_ERROR_OPERATING_SYSTEM: "The operating system has blocked the request.",
        NVML_ERROR_LIB_RM_VERSION_MISMATCH: "RM has detected an NVML/RM version mismatch.",
        NVML_ERROR_MEMORY: "Insufficient Memory",
        NVML_ERROR_UNKNOWN: "Unknown Error",
    }

    def __new__(typ, value):
        """
        Maps value to a proper subclass of NVMLError.
        See _extractNVMLErrorsAsClasses function for more details
        """
        if typ == NVMLError:
            typ = NVMLError._valClassMapping.get(value, typ)
        obj = Exception.__new__(typ)
        obj.value = value
        return obj

    def __str__(self):
        try:
            if self.value not in NVMLError._errcode_to_string:
                NVMLError._errcode_to_string[self.value] = str(nvmlErrorString(self.value))
            return NVMLError._errcode_to_string[self.value]
        except NVMLError:  # NVMLError_Uninitialized:
            return "NVML Error with code %d" % self.value

    def __eq__(self, other):
        return self.value == other.value

__new__(typ, value)

Maps value to a proper subclass of NVMLError. See _extractNVMLErrorsAsClasses function for more details

Source code in src/super_gradients/common/environment/monitoring/gpu/pynvml.py
648
649
650
651
652
653
654
655
656
657
def __new__(typ, value):
    """
    Maps value to a proper subclass of NVMLError.
    See _extractNVMLErrorsAsClasses function for more details
    """
    if typ == NVMLError:
        typ = NVMLError._valClassMapping.get(value, typ)
    obj = Exception.__new__(typ)
    obj.value = value
    return obj

SystemMonitor

Monitor and write to tensorboard the system statistics, such as CPU usage, GPU, ...

Parameters:

Name Type Description Default
tensorboard_writer SummaryWriter

Tensorboard object that will be used to save the statistics

required
extra_gpu_stats bool

Set to True to get extra gpu statistics, such as gpu temperature, power usage, ... Default set to False, because this reduces the tensorboard readability.

False
Source code in src/super_gradients/common/environment/monitoring/monitoring.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class SystemMonitor:
    """Monitor and write to tensorboard the system statistics, such as CPU usage, GPU, ...

    :param tensorboard_writer:  Tensorboard object that will be used to save the statistics
    :param extra_gpu_stats:     Set to True to get extra gpu statistics, such as gpu temperature, power usage, ...
                                Default set to False, because this reduces the tensorboard readability.
    """

    def __init__(self, tensorboard_writer: SummaryWriter, extra_gpu_stats: bool = False):
        self.tensorboard_writer = tensorboard_writer
        self.write_count = 0
        self.running = True

        self.aggregate_frequency = 30  # in sec
        self.n_samples_per_aggregate = 60
        self.sample_interval = self.aggregate_frequency / self.n_samples_per_aggregate

        self.stat_aggregators = [
            StatAggregator(name="System/disk.usage_percent", sampling_fn=disk.get_disk_usage_percent, aggregate_fn=average),
            StatAggregator(name="System/disk.io_write_mbs", sampling_fn=disk.get_io_write_mb, aggregate_fn=delta_per_s, reset_callback_fn=disk.reset_io_write),
            StatAggregator(name="System/disk.io_read_mbs", sampling_fn=disk.get_io_read_mb, aggregate_fn=delta_per_s, reset_callback_fn=disk.reset_io_read),
            StatAggregator(name="System/memory.usage_percent", sampling_fn=virtual_memory.virtual_memory_used_percent, aggregate_fn=average),
            StatAggregator(
                name="System/network.network_sent_mbs",
                sampling_fn=network.get_network_sent_mb,
                aggregate_fn=delta_per_s,
                reset_callback_fn=network.reset_network_sent,
            ),
            StatAggregator(
                name="System/network.network_recv_mbs",
                sampling_fn=network.get_network_recv_mb,
                aggregate_fn=delta_per_s,
                reset_callback_fn=network.reset_network_recv,
            ),
            StatAggregator(name="System/cpu.usage_percent", sampling_fn=cpu.get_cpu_percent, aggregate_fn=average),
        ]

        is_nvidia_lib_available = gpu.safe_init_nvidia_management_lib()
        if is_nvidia_lib_available:
            self.stat_aggregators += [
                *GPUStatAggregatorIterator(
                    name="System/gpu.memory_usage_percent", device_sampling_fn=gpu.get_device_memory_usage_percent, device_aggregate_fn=average
                ),
                *GPUStatAggregatorIterator(
                    name="System/gpu.memory_allocated_percent", device_sampling_fn=gpu.get_device_memory_allocated_percent, device_aggregate_fn=average
                ),
                *GPUStatAggregatorIterator(name="System/gpu.usage_percent", device_sampling_fn=gpu.get_device_usage_percent, device_aggregate_fn=average),
            ]

            if extra_gpu_stats:
                self.stat_aggregators += [
                    *GPUStatAggregatorIterator(name="System/gpu.temperature_c", device_sampling_fn=gpu.get_device_temperature_c, device_aggregate_fn=average),
                    *GPUStatAggregatorIterator(name="System/gpu.power_usage_w", device_sampling_fn=gpu.get_device_power_usage_w, device_aggregate_fn=average),
                    *GPUStatAggregatorIterator(
                        name="System/gpu.power_usage_percent", device_sampling_fn=gpu.get_device_power_usage_percent, device_aggregate_fn=average
                    ),
                ]

        thread = threading.Thread(target=self._run, daemon=True, name="SystemMonitor")
        thread.start()

    def _run(self):
        """Sample, aggregate and write the statistics regularly."""
        self._init_stat_aggregators()
        while self.running:
            for _ in range(self.n_samples_per_aggregate):
                self._sample()
                time.sleep(self.sample_interval)
                if not self.running:
                    break
            self._aggregate_and_write()

    def _init_stat_aggregators(self):
        for stat_aggregator in self.stat_aggregators:
            stat_aggregator.reset()

    def _sample(self):
        """Sample the stat_aggregators, i.e. get the current value of each of them."""
        for stat_aggregator in self.stat_aggregators:
            stat_aggregator.sample()

    def _aggregate_and_write(self):
        """Aggregate and write the results."""
        for stat_aggregator in self.stat_aggregators:
            scalar = stat_aggregator.aggregate()
            if scalar is not None:
                self.tensorboard_writer.add_scalar(tag=stat_aggregator.name, scalar_value=scalar, global_step=self.write_count)
            stat_aggregator.reset()
        self.write_count += 1

    @classmethod
    @multi_process_safe
    def start(cls, tensorboard_writer: SummaryWriter):
        """Instantiate a SystemMonitor in a multiprocess safe way."""
        return cls(tensorboard_writer=tensorboard_writer)

    def close(self):
        self.running = False

start(tensorboard_writer) classmethod

Instantiate a SystemMonitor in a multiprocess safe way.

Source code in src/super_gradients/common/environment/monitoring/monitoring.py
101
102
103
104
105
@classmethod
@multi_process_safe
def start(cls, tensorboard_writer: SummaryWriter):
    """Instantiate a SystemMonitor in a multiprocess safe way."""
    return cls(tensorboard_writer=tensorboard_writer)

get_network_recv_mb()

Number of MegaBytes received since import

Source code in src/super_gradients/common/environment/monitoring/network.py
13
14
15
def get_network_recv_mb() -> float:
    """Number of MegaBytes received since import"""
    return bytes_to_megabytes(psutil.net_io_counters().bytes_recv - buffer_network_bytes_recv)

get_network_sent_mb()

Number of MegaBytes sent since import

Source code in src/super_gradients/common/environment/monitoring/network.py
 8
 9
10
def get_network_sent_mb() -> float:
    """Number of MegaBytes sent since import"""
    return bytes_to_megabytes(psutil.net_io_counters().bytes_sent - buffer_network_bytes_sent)

reset_network_recv()

Reset the value of net_io_counters

Source code in src/super_gradients/common/environment/monitoring/network.py
24
25
26
27
def reset_network_recv():
    """Reset the value of net_io_counters"""
    global buffer_network_bytes_recv
    buffer_network_bytes_recv = psutil.net_io_counters().bytes_recv

reset_network_sent()

Reset the value of net_io_counters

Source code in src/super_gradients/common/environment/monitoring/network.py
18
19
20
21
def reset_network_sent():
    """Reset the value of net_io_counters"""
    global buffer_network_bytes_sent
    buffer_network_bytes_sent = psutil.net_io_counters().bytes_sent

average(samples, time_diff)

Average a list of values, return None if empty list

Source code in src/super_gradients/common/environment/monitoring/utils.py
4
5
6
def average(samples: List[float], time_diff: float) -> Union[float, None]:
    """Average a list of values, return None if empty list"""
    return sum(samples) / len(samples) if samples else None

bytes_to_megabytes(b)

Convert bytes to megabytes

Source code in src/super_gradients/common/environment/monitoring/utils.py
14
15
16
17
def bytes_to_megabytes(b: float) -> float:
    """Convert bytes to megabytes"""
    BYTES_PER_MEGABYTE = 1024**2
    return b / BYTES_PER_MEGABYTE

delta_per_s(samples, time_diff)

Compute the difference per second (ex. megabytes per second), return None if empty list

Source code in src/super_gradients/common/environment/monitoring/utils.py
 9
10
11
def delta_per_s(samples: List[float], time_diff: float) -> Union[float, None]:
    """Compute the difference per second (ex. megabytes per second), return None if empty list"""
    return (samples[-1] - samples[0]) / time_diff if samples else None

virtual_memory_used_percent()

Virtual memory used in percent.

Source code in src/super_gradients/common/environment/monitoring/virtual_memory.py
4
5
6
def virtual_memory_used_percent() -> float:
    """Virtual memory used in percent."""
    return psutil.virtual_memory().percent

RecipeShortcutsCallback

Bases: Callback

Interpolates the shortcuts defined in variable_set.yaml: lr batch_size val_batch_size ema epochs resume: False num_workers

When any of the above are not set, they will be populated with the original values (for example config.lr will be set with config.training_hyperparams.initial_lr) for clarity in logs.

Source code in src/super_gradients/common/environment/omegaconf_utils.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class RecipeShortcutsCallback(Callback):
    """
    Interpolates the shortcuts defined in variable_set.yaml:
            lr
            batch_size
            val_batch_size
            ema
            epochs
            resume: False
            num_workers

    When any of the above are not set, they will be populated with the original values (for example
        config.lr will be set with config.training_hyperparams.initial_lr) for clarity in logs.

    """

    def on_run_start(self, config: DictConfig, **kwargs: Any) -> None:
        config.lr, config.training_hyperparams.initial_lr = self._override_with_shortcut(config.lr, config.training_hyperparams.initial_lr)

        config.batch_size, config.dataset_params.train_dataloader_params.batch_size = self._override_with_shortcut(
            config.batch_size, config.dataset_params.train_dataloader_params.batch_size
        )

        config.val_batch_size, config.dataset_params.val_dataloader_params.batch_size = self._override_with_shortcut(
            config.val_batch_size, config.dataset_params.val_dataloader_params.batch_size
        )

        config.resume, config.training_hyperparams.resume = self._override_with_shortcut(config.resume, config.training_hyperparams.resume)

        config.epochs, config.training_hyperparams.max_epochs = self._override_with_shortcut(config.epochs, config.training_hyperparams.max_epochs)

        config.ema, config.training_hyperparams.ema = self._override_with_shortcut(config.ema, config.training_hyperparams.ema)

        config.num_workers, config.dataset_params.train_dataloader_params.num_workers = self._override_with_shortcut(
            config.num_workers, config.dataset_params.train_dataloader_params.num_workers
        )

        config.num_workers, config.dataset_params.val_dataloader_params.num_workers = self._override_with_shortcut(
            config.num_workers, config.dataset_params.val_dataloader_params.num_workers
        )

    @staticmethod
    def _override_with_shortcut(shortcut_value, main_value):
        if shortcut_value is not None:
            value = shortcut_value
        else:
            value = main_value

        return value, value

get_cls(cls_path)

A resolver for Hydra/OmegaConf to allow getting a class instead on an instance. usage: class_of_optimizer: ${class:torch.optim.Adam}

Source code in src/super_gradients/common/environment/omegaconf_utils.py
63
64
65
66
67
68
69
70
71
72
def get_cls(cls_path: str):
    """
    A resolver for Hydra/OmegaConf to allow getting a class instead on an instance.
    usage:
    class_of_optimizer: ${class:torch.optim.Adam}
    """
    module = ".".join(cls_path.split(".")[:-1])
    name = cls_path.split(".")[-1]
    importlib.import_module(module)
    return getattr(sys.modules[module], name)

register_hydra_resolvers()

Register all the hydra resolvers required for the super-gradients recipes.

Source code in src/super_gradients/common/environment/omegaconf_utils.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def register_hydra_resolvers():
    """Register all the hydra resolvers required for the super-gradients recipes."""

    from super_gradients.training.datasets.detection_datasets.roboflow.utils import get_dataset_num_classes

    OmegaConf.register_new_resolver("hydra_output_dir", hydra_output_dir_resolver, replace=True)
    OmegaConf.register_new_resolver("class", lambda *args: get_cls(*args), replace=True)
    OmegaConf.register_new_resolver("add", lambda *args: sum(args), replace=True)
    OmegaConf.register_new_resolver("div", lambda x, y: x / y, replace=True)
    OmegaConf.register_new_resolver("mul", lambda *args: reduce(operator.mul, args[1:], args[0]), replace=True)
    OmegaConf.register_new_resolver("cond", lambda boolean, x, y: x if boolean else y, replace=True)
    OmegaConf.register_new_resolver("getitem", lambda container, key: container[key], replace=True)  # get item from a container (list, dict...)
    OmegaConf.register_new_resolver("first", lambda lst: lst[0], replace=True)  # get the first item from a list
    OmegaConf.register_new_resolver("last", lambda lst: lst[-1], replace=True)  # get the last item from a list
    OmegaConf.register_new_resolver("len", lambda lst: len(lst), replace=True)  # returns the length of the list

    OmegaConf.register_new_resolver("roboflow_dataset_num_classes", get_dataset_num_classes, replace=True)

get_installed_packages()

Map all the installed packages to their version.

Source code in src/super_gradients/common/environment/package_utils.py
5
6
7
def get_installed_packages() -> Dict[str, str]:
    """Map all the installed packages to their version."""
    return {package.key.lower(): package.version for package in pkg_resources.working_set}

normalize_path(path)

Normalize the directory of file path. Replace the Windows-style () path separators with unix ones (/). This is necessary when running on Windows since Hydra compose fails to find a configuration file is the config directory contains backward slash symbol.

Parameters:

Name Type Description Default
path str

Input path string

required

Returns:

Type Description
str

Output path string with all \ symbols replaces with /.

Source code in src/super_gradients/common/environment/path_utils.py
1
2
3
4
5
6
7
8
9
def normalize_path(path: str) -> str:
    """Normalize the directory of file path. Replace the Windows-style (\\) path separators with unix ones (/).
    This is necessary when running on Windows since Hydra compose fails to find a configuration file is the config
    directory contains backward slash symbol.

    :param path: Input path string
    :return: Output path string with all \\ symbols replaces with /.
    """
    return path.replace("\\", "/")