Skip to content

Utils

get_builtin_activation_type(activation, **kwargs)

Returns activation class by its name from torch.nn namespace. This function support all modules available from torch.nn and also their lower-case aliases. On top of that, it supports a few aliaes: leaky_relu (LeakyReLU), swish (silu).

act_cls = get_activation_type("LeakyReLU", inplace=True, slope=0.01) act = act_cls()

Parameters:

Name Type Description Default
activation Union[str, None]

Activation function name (E.g. ReLU). If None - return nn.Identity

required

Returns:

Type Description
Type[nn.Module]

Type of the activation function that is ready to be instantiated

Source code in training/utils/activations_utils.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def get_builtin_activation_type(activation: Union[str, None], **kwargs) -> Type[nn.Module]:
    """
    Returns activation class by its name from torch.nn namespace. This function support all modules available from
    torch.nn and also their lower-case aliases.
    On top of that, it supports a few aliaes: leaky_relu (LeakyReLU), swish (silu).

    >>> act_cls = get_activation_type("LeakyReLU", inplace=True, slope=0.01)
    >>> act = act_cls()


    :param activation: Activation function name (E.g. ReLU). If None - return nn.Identity
    :param **kwargs  : Extra arguments to pass to constructor during instantiation (E.g. inplace=True)

    :returns         : Type of the activation function that is ready to be instantiated
    """

    if activation is None:
        activation_cls = nn.Identity
    else:
        lowercase_aliases: Dict[str, str] = dict((k.lower(), k) for k in torch.nn.__dict__.keys())

        # Register additional aliases
        lowercase_aliases["leaky_relu"] = "LeakyReLU"  # LeakyRelu in snake_case
        lowercase_aliases["swish"] = "SiLU"  # Swish shich is equivalent to SiLU
        lowercase_aliases["none"] = "Identity"

        if activation in lowercase_aliases:
            activation = lowercase_aliases[activation]

        if activation not in torch.nn.__dict__:
            raise KeyError(f"Requested activation function {activation} is not known")

        activation_cls = torch.nn.__dict__[activation]
        if len(kwargs):
            activation_cls = partial(activation_cls, **kwargs)

    return activation_cls

batch_distance2bbox(points, distance, max_shapes=None)

Decode distance prediction to bounding box for batch.

Parameters:

Name Type Description Default
points Tensor

[B, ..., 2], "xy" format

required
distance Tensor

[B, ..., 4], "ltrb" format

required
max_shapes Optional[Tensor]

[B, 2], "h,w" format, Shape of the image.

None

Returns:

Type Description
Tensor

Tensor: Decoded bboxes, "x1y1x2y2" format.

Source code in training/utils/bbox_utils.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def batch_distance2bbox(points: Tensor, distance: Tensor, max_shapes: Optional[Tensor] = None) -> Tensor:
    """Decode distance prediction to bounding box for batch.

    :param points: [B, ..., 2], "xy" format
    :param distance: [B, ..., 4], "ltrb" format
    :param max_shapes: [B, 2], "h,w" format, Shape of the image.
    :return: Tensor: Decoded bboxes, "x1y1x2y2" format.
    """
    lt, rb = torch.split(distance, 2, dim=-1)
    # while tensor add parameters, parameters should be better placed on the second place
    x1y1 = -lt + points
    x2y2 = rb + points
    out_bbox = torch.cat([x1y1, x2y2], dim=-1)
    if max_shapes is not None:
        max_shapes = max_shapes.flip(-1).tile([1, 2])
        delta_dim = out_bbox.ndim - max_shapes.ndim
        for _ in range(delta_dim):
            max_shapes.unsqueeze_(1)
        out_bbox = torch.where(out_bbox < max_shapes, out_bbox, max_shapes)
        out_bbox = torch.where(out_bbox > 0, out_bbox, torch.zeros_like(out_bbox))
    return out_bbox

Callback

Base callback class with all the callback methods. Derived classes may override one or many of the available events to receive callbacks when such events are triggered by the training loop.

The order of the events is as follows:

on_training_start(context) # called once before training starts, good for setting up the warmup LR

for epoch in range(epochs):
    on_train_loader_start(context)
        for batch in train_loader:
            on_train_batch_start(context)
            on_train_batch_loss_end(context)               # called after loss has been computed
            on_train_batch_backward_end(context)           # called after .backward() was called
            on_train_batch_gradient_step_start(context)    # called before the optimizer step about to happen (gradient clipping, logging of gradients)
            on_train_batch_gradient_step_end(context)      # called after gradient step was done, good place to update LR (for step-based schedulers)
            on_train_batch_end(context)
    on_train_loader_end(context)

    on_validation_loader_start(context)
        for batch in validation_loader:
            on_validation_batch_start(context)
            on_validation_batch_end(context)
    on_validation_loader_end(context)
    on_validation_end_best_epoch(context)

on_test_start(context)
    for batch in test_loader:
        on_test_batch_start(context)
        on_test_batch_end(context)
on_test_end(context)

on_training_end(context) # called once after training ends.

Correspondence mapping from the old callback API:

on_training_start(context) <-> Phase.PRE_TRAINING for epoch in range(epochs): on_train_loader_start(context) <-> Phase.TRAIN_EPOCH_START for batch in train_loader: on_train_batch_start(context) on_train_batch_loss_end(context) on_train_batch_backward_end(context) <-> Phase.TRAIN_BATCH_END on_train_batch_gradient_step_start(context) on_train_batch_gradient_step_end(context) <-> Phase.TRAIN_BATCH_STEP on_train_batch_end(context) on_train_loader_end(context) <-> Phase.TRAIN_EPOCH_END

on_validation_loader_start(context)
    for batch in validation_loader:
        on_validation_batch_start(context)
        on_validation_batch_end(context)               <-> Phase.VALIDATION_BATCH_END
on_validation_loader_end(context)                      <-> Phase.VALIDATION_EPOCH_END
on_validation_end_best_epoch(context)                  <-> Phase.VALIDATION_END_BEST_EPOCH

on_test_start(context) for batch in test_loader: on_test_batch_start(context) on_test_batch_end(context) <-> Phase.TEST_BATCH_END on_test_end(context) <-> Phase.TEST_END

on_training_end(context) <-> Phase.POST_TRAINING

Source code in training/utils/callbacks/base_callbacks.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
class Callback:
    """
    Base callback class with all the callback methods. Derived classes may override one or many of the available events
    to receive callbacks when such events are triggered by the training loop.

    The order of the events is as follows:

    on_training_start(context)                              # called once before training starts, good for setting up the warmup LR

        for epoch in range(epochs):
            on_train_loader_start(context)
                for batch in train_loader:
                    on_train_batch_start(context)
                    on_train_batch_loss_end(context)               # called after loss has been computed
                    on_train_batch_backward_end(context)           # called after .backward() was called
                    on_train_batch_gradient_step_start(context)    # called before the optimizer step about to happen (gradient clipping, logging of gradients)
                    on_train_batch_gradient_step_end(context)      # called after gradient step was done, good place to update LR (for step-based schedulers)
                    on_train_batch_end(context)
            on_train_loader_end(context)

            on_validation_loader_start(context)
                for batch in validation_loader:
                    on_validation_batch_start(context)
                    on_validation_batch_end(context)
            on_validation_loader_end(context)
            on_validation_end_best_epoch(context)

        on_test_start(context)
            for batch in test_loader:
                on_test_batch_start(context)
                on_test_batch_end(context)
        on_test_end(context)

    on_training_end(context)                    # called once after training ends.

    Correspondence mapping from the old callback API:

    on_training_start(context)                                 <-> Phase.PRE_TRAINING
    for epoch in range(epochs):
        on_train_loader_start(context)                         <-> Phase.TRAIN_EPOCH_START
            for batch in train_loader:
                on_train_batch_start(context)
                on_train_batch_loss_end(context)
                on_train_batch_backward_end(context)           <-> Phase.TRAIN_BATCH_END
                on_train_batch_gradient_step_start(context)
                on_train_batch_gradient_step_end(context)      <-> Phase.TRAIN_BATCH_STEP
                on_train_batch_end(context)
        on_train_loader_end(context)                           <-> Phase.TRAIN_EPOCH_END

        on_validation_loader_start(context)
            for batch in validation_loader:
                on_validation_batch_start(context)
                on_validation_batch_end(context)               <-> Phase.VALIDATION_BATCH_END
        on_validation_loader_end(context)                      <-> Phase.VALIDATION_EPOCH_END
        on_validation_end_best_epoch(context)                  <-> Phase.VALIDATION_END_BEST_EPOCH

    on_test_start(context)
        for batch in test_loader:
            on_test_batch_start(context)
            on_test_batch_end(context)                         <-> Phase.TEST_BATCH_END
    on_test_end(context)                                       <-> Phase.TEST_END

    on_training_end(context)                                   <-> Phase.POST_TRAINING
    """

    def on_training_start(self, context: PhaseContext) -> None:
        """
        Called once before start of the first epoch
        At this point, the context argument is guaranteed to have the following attributes:
        - optimizer
        - net
        - checkpoints_dir_path
        - criterion
        - sg_logger
        - train_loader
        - valid_loader
        - training_params
        - checkpoint_params
        - architecture
        - arch_params
        - metric_to_watch
        - device
        - ema_model

        The corresponding Phase enum value for this event is Phase.PRE_TRAINING.
        :param context:
        :return:
        """
        pass

    def on_train_loader_start(self, context: PhaseContext) -> None:
        """
        Called each epoch at the start of train data loader (before getting the first batch).
        At this point, the context argument is guaranteed to have the following attributes:
        - epoch
        The corresponding Phase enum value for this event is Phase.TRAIN_EPOCH_START.
        :param context:
        :return:
        """
        pass

    def on_train_batch_start(self, context: PhaseContext) -> None:
        """
        Called at each batch after getting batch of data from data loader and moving it to target device.
        This event triggered AFTER Trainer.pre_prediction_callback call (If it was defined).

        At this point the context argument is guaranteed to have the following attributes:
        - batch_idx
        - inputs
        - targets
        - **additional_batch_items

        :param context:
        :return:
        """
        pass

    def on_train_batch_loss_end(self, context: PhaseContext) -> None:
        """
        Called after model forward and loss computation has been done.
        At this point the context argument is guaranteed to have the following attributes:
        - preds
        - loss_log_items
        The corresponding Phase enum value for this event is Phase.TRAIN_BATCH_END.

        :param context:
        :return:
        """

        pass

    def on_train_batch_backward_end(self, context: PhaseContext) -> None:
        """
        Called after loss.backward() method was called for a given batch

        :param context:
        :return:
        """
        pass

    def on_train_batch_gradient_step_start(self, context: PhaseContext) -> None:
        """
        Called before the graadient step is about to happen.
        Good place to clip gradients (with respect to scaler), log gradients to data ratio, etc.
        :param context:
        :return:
        """
        pass

    def on_train_batch_gradient_step_end(self, context: PhaseContext) -> None:
        """
        Called after gradient step has been performed. Good place to update LR (for step-based schedulers)
        The corresponding Phase enum value for this event is Phase.TRAIN_BATCH_STEP.
        :param context:
        :return:
        """
        pass

    def on_train_batch_end(self, context: PhaseContext) -> None:
        """
        Called after all forward/backward/optimizer steps have been performed for a given batch and there is nothing left to do.

        :param context:
        :return:
        """

        pass

    def on_train_loader_end(self, context: PhaseContext) -> None:
        """
        Called each epoch at the end of train data loader (after processing the last batch).
        The corresponding Phase enum value for this event is Phase.TRAIN_EPOCH_END.
        :param context:
        :return:
        """

        pass

    def on_validation_loader_start(self, context: PhaseContext) -> None:
        """
        Called each epoch at the start of validation data loader (before getting the first batch).
        :param context:
        :return:
        """

        pass

    def on_validation_batch_start(self, context: PhaseContext) -> None:
        """
        Called at each batch after getting batch of data from validation loader and moving it to target device.
        :param context:
        :return:
        """
        pass

    def on_validation_batch_end(self, context: PhaseContext) -> None:
        """
        Called after all forward step / loss / metric computation have been performed for a given batch and there is nothing left to do.
        The corresponding Phase enum value for this event is Phase.VALIDATION_BATCH_END.
        :param context:
        :return:
        """
        pass

    def on_validation_loader_end(self, context: PhaseContext) -> None:
        """
        Called each epoch at the end of validation data loader (after processing the last batch).
        The corresponding Phase enum value for this event is Phase.VALIDATION_EPOCH_END.
        :param context:
        :return:
        """
        pass

    def on_validation_end_best_epoch(self, context: PhaseContext) -> None:
        """
        Called each epoch after validation has been performed and the best metric has been achieved.
        The corresponding Phase enum value for this event is Phase.VALIDATION_END_BEST_EPOCH.
        :param context:
        :return:
        """
        pass

    def on_test_loader_start(self, context: PhaseContext) -> None:
        """
        Called once at the start of test data loader (before getting the first batch).
        :param context:
        :return:
        """

        pass

    def on_test_batch_start(self, context: PhaseContext) -> None:
        """
        Called at each batch after getting batch of data from test loader and moving it to target device.
        :param context:
        :return:
        """
        pass

    def on_test_batch_end(self, context: PhaseContext) -> None:
        """
        Called after all forward step have been performed for a given batch and there is nothing left to do.
        The corresponding Phase enum value for this event is Phase.TEST_BATCH_END.
        :param context:
        :return:
        """
        pass

    def on_test_loader_end(self, context: PhaseContext) -> None:
        """
        Called once at the end of test data loader (after processing the last batch).
        The corresponding Phase enum value for this event is Phase.TEST_END.
        :param context:
        :return:
        """
        pass

    def on_training_end(self, context: PhaseContext) -> None:
        """
        Called once after the training loop has finished (Due to reaching optimization criterion or because of an error.)
        The corresponding Phase enum value for this event is Phase.POST_TRAINING.
        :param context:
        :return:
        """
        pass

on_test_batch_end(context)

Called after all forward step have been performed for a given batch and there is nothing left to do. The corresponding Phase enum value for this event is Phase.TEST_BATCH_END.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
333
334
335
336
337
338
339
340
def on_test_batch_end(self, context: PhaseContext) -> None:
    """
    Called after all forward step have been performed for a given batch and there is nothing left to do.
    The corresponding Phase enum value for this event is Phase.TEST_BATCH_END.
    :param context:
    :return:
    """
    pass

on_test_batch_start(context)

Called at each batch after getting batch of data from test loader and moving it to target device.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
325
326
327
328
329
330
331
def on_test_batch_start(self, context: PhaseContext) -> None:
    """
    Called at each batch after getting batch of data from test loader and moving it to target device.
    :param context:
    :return:
    """
    pass

on_test_loader_end(context)

Called once at the end of test data loader (after processing the last batch). The corresponding Phase enum value for this event is Phase.TEST_END.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
342
343
344
345
346
347
348
349
def on_test_loader_end(self, context: PhaseContext) -> None:
    """
    Called once at the end of test data loader (after processing the last batch).
    The corresponding Phase enum value for this event is Phase.TEST_END.
    :param context:
    :return:
    """
    pass

on_test_loader_start(context)

Called once at the start of test data loader (before getting the first batch).

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
316
317
318
319
320
321
322
323
def on_test_loader_start(self, context: PhaseContext) -> None:
    """
    Called once at the start of test data loader (before getting the first batch).
    :param context:
    :return:
    """

    pass

on_train_batch_backward_end(context)

Called after loss.backward() method was called for a given batch

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
225
226
227
228
229
230
231
232
def on_train_batch_backward_end(self, context: PhaseContext) -> None:
    """
    Called after loss.backward() method was called for a given batch

    :param context:
    :return:
    """
    pass

on_train_batch_end(context)

Called after all forward/backward/optimizer steps have been performed for a given batch and there is nothing left to do.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
252
253
254
255
256
257
258
259
260
def on_train_batch_end(self, context: PhaseContext) -> None:
    """
    Called after all forward/backward/optimizer steps have been performed for a given batch and there is nothing left to do.

    :param context:
    :return:
    """

    pass

on_train_batch_gradient_step_end(context)

Called after gradient step has been performed. Good place to update LR (for step-based schedulers) The corresponding Phase enum value for this event is Phase.TRAIN_BATCH_STEP.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
243
244
245
246
247
248
249
250
def on_train_batch_gradient_step_end(self, context: PhaseContext) -> None:
    """
    Called after gradient step has been performed. Good place to update LR (for step-based schedulers)
    The corresponding Phase enum value for this event is Phase.TRAIN_BATCH_STEP.
    :param context:
    :return:
    """
    pass

on_train_batch_gradient_step_start(context)

Called before the graadient step is about to happen. Good place to clip gradients (with respect to scaler), log gradients to data ratio, etc.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
234
235
236
237
238
239
240
241
def on_train_batch_gradient_step_start(self, context: PhaseContext) -> None:
    """
    Called before the graadient step is about to happen.
    Good place to clip gradients (with respect to scaler), log gradients to data ratio, etc.
    :param context:
    :return:
    """
    pass

on_train_batch_loss_end(context)

Called after model forward and loss computation has been done. At this point the context argument is guaranteed to have the following attributes: - preds - loss_log_items The corresponding Phase enum value for this event is Phase.TRAIN_BATCH_END.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
211
212
213
214
215
216
217
218
219
220
221
222
223
def on_train_batch_loss_end(self, context: PhaseContext) -> None:
    """
    Called after model forward and loss computation has been done.
    At this point the context argument is guaranteed to have the following attributes:
    - preds
    - loss_log_items
    The corresponding Phase enum value for this event is Phase.TRAIN_BATCH_END.

    :param context:
    :return:
    """

    pass

on_train_batch_start(context)

Called at each batch after getting batch of data from data loader and moving it to target device. This event triggered AFTER Trainer.pre_prediction_callback call (If it was defined).

At this point the context argument is guaranteed to have the following attributes: - batch_idx - inputs - targets - **additional_batch_items

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def on_train_batch_start(self, context: PhaseContext) -> None:
    """
    Called at each batch after getting batch of data from data loader and moving it to target device.
    This event triggered AFTER Trainer.pre_prediction_callback call (If it was defined).

    At this point the context argument is guaranteed to have the following attributes:
    - batch_idx
    - inputs
    - targets
    - **additional_batch_items

    :param context:
    :return:
    """
    pass

on_train_loader_end(context)

Called each epoch at the end of train data loader (after processing the last batch). The corresponding Phase enum value for this event is Phase.TRAIN_EPOCH_END.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
262
263
264
265
266
267
268
269
270
def on_train_loader_end(self, context: PhaseContext) -> None:
    """
    Called each epoch at the end of train data loader (after processing the last batch).
    The corresponding Phase enum value for this event is Phase.TRAIN_EPOCH_END.
    :param context:
    :return:
    """

    pass

on_train_loader_start(context)

Called each epoch at the start of train data loader (before getting the first batch). At this point, the context argument is guaranteed to have the following attributes: - epoch The corresponding Phase enum value for this event is Phase.TRAIN_EPOCH_START.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
184
185
186
187
188
189
190
191
192
193
def on_train_loader_start(self, context: PhaseContext) -> None:
    """
    Called each epoch at the start of train data loader (before getting the first batch).
    At this point, the context argument is guaranteed to have the following attributes:
    - epoch
    The corresponding Phase enum value for this event is Phase.TRAIN_EPOCH_START.
    :param context:
    :return:
    """
    pass

on_training_end(context)

Called once after the training loop has finished (Due to reaching optimization criterion or because of an error.) The corresponding Phase enum value for this event is Phase.POST_TRAINING.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
351
352
353
354
355
356
357
358
def on_training_end(self, context: PhaseContext) -> None:
    """
    Called once after the training loop has finished (Due to reaching optimization criterion or because of an error.)
    The corresponding Phase enum value for this event is Phase.POST_TRAINING.
    :param context:
    :return:
    """
    pass

on_training_start(context)

Called once before start of the first epoch At this point, the context argument is guaranteed to have the following attributes: - optimizer - net - checkpoints_dir_path - criterion - sg_logger - train_loader - valid_loader - training_params - checkpoint_params - architecture - arch_params - metric_to_watch - device - ema_model

The corresponding Phase enum value for this event is Phase.PRE_TRAINING.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def on_training_start(self, context: PhaseContext) -> None:
    """
    Called once before start of the first epoch
    At this point, the context argument is guaranteed to have the following attributes:
    - optimizer
    - net
    - checkpoints_dir_path
    - criterion
    - sg_logger
    - train_loader
    - valid_loader
    - training_params
    - checkpoint_params
    - architecture
    - arch_params
    - metric_to_watch
    - device
    - ema_model

    The corresponding Phase enum value for this event is Phase.PRE_TRAINING.
    :param context:
    :return:
    """
    pass

on_validation_batch_end(context)

Called after all forward step / loss / metric computation have been performed for a given batch and there is nothing left to do. The corresponding Phase enum value for this event is Phase.VALIDATION_BATCH_END.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
289
290
291
292
293
294
295
296
def on_validation_batch_end(self, context: PhaseContext) -> None:
    """
    Called after all forward step / loss / metric computation have been performed for a given batch and there is nothing left to do.
    The corresponding Phase enum value for this event is Phase.VALIDATION_BATCH_END.
    :param context:
    :return:
    """
    pass

on_validation_batch_start(context)

Called at each batch after getting batch of data from validation loader and moving it to target device.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
281
282
283
284
285
286
287
def on_validation_batch_start(self, context: PhaseContext) -> None:
    """
    Called at each batch after getting batch of data from validation loader and moving it to target device.
    :param context:
    :return:
    """
    pass

on_validation_end_best_epoch(context)

Called each epoch after validation has been performed and the best metric has been achieved. The corresponding Phase enum value for this event is Phase.VALIDATION_END_BEST_EPOCH.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
307
308
309
310
311
312
313
314
def on_validation_end_best_epoch(self, context: PhaseContext) -> None:
    """
    Called each epoch after validation has been performed and the best metric has been achieved.
    The corresponding Phase enum value for this event is Phase.VALIDATION_END_BEST_EPOCH.
    :param context:
    :return:
    """
    pass

on_validation_loader_end(context)

Called each epoch at the end of validation data loader (after processing the last batch). The corresponding Phase enum value for this event is Phase.VALIDATION_EPOCH_END.

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
298
299
300
301
302
303
304
305
def on_validation_loader_end(self, context: PhaseContext) -> None:
    """
    Called each epoch at the end of validation data loader (after processing the last batch).
    The corresponding Phase enum value for this event is Phase.VALIDATION_EPOCH_END.
    :param context:
    :return:
    """
    pass

on_validation_loader_start(context)

Called each epoch at the start of validation data loader (before getting the first batch).

Parameters:

Name Type Description Default
context PhaseContext required

Returns:

Type Description
None
Source code in training/utils/callbacks/base_callbacks.py
272
273
274
275
276
277
278
279
def on_validation_loader_start(self, context: PhaseContext) -> None:
    """
    Called each epoch at the start of validation data loader (before getting the first batch).
    :param context:
    :return:
    """

    pass

CallbackHandler

Bases: Callback

Runs all callbacks

Parameters:

Name Type Description Default
callbacks List[Callback]

Callbacks to be run.

required
Source code in training/utils/callbacks/base_callbacks.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
class CallbackHandler(Callback):
    """
    Runs all callbacks

    :param callbacks: Callbacks to be run.
    """

    def __init__(self, callbacks: List[Callback]):
        # TODO: Add reordering of callbacks to make sure that they are called in the right order
        # For instance, two callbacks may be dependent on each other, so the first one should be called first
        # Example: Gradient Clipping & Gradient Logging callback. We first need to clip the gradients, and then log them
        # So if user added them in wrong order we can guarantee their order would be correct.
        # We can achieve this by adding a property to the callback to the callback indicating it's priority:
        # Forward   = 0
        # Loss      = 100
        # Backward  = 200
        # Metrics   = 300
        # Scheduler = 400
        # Logging   = 500
        # So ordering callbacks by their order would ensure than we first run all Forward-related callbacks (for a given event),
        # Than backward, and only then - logging.
        self.callbacks = callbacks

    def on_training_start(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_training_start(context)

    def on_train_loader_start(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_train_loader_start(context)

    def on_train_batch_start(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_train_batch_start(context)

    def on_train_batch_loss_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_train_batch_loss_end(context)

    def on_train_batch_backward_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_train_batch_backward_end(context)

    def on_train_batch_gradient_step_start(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_train_batch_gradient_step_start(context)

    def on_train_batch_gradient_step_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_train_batch_gradient_step_end(context)

    def on_train_batch_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_train_batch_end(context)

    def on_validation_loader_start(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_validation_loader_start(context)

    def on_validation_batch_start(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_validation_batch_start(context)

    def on_validation_batch_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_validation_batch_end(context)

    def on_validation_loader_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_validation_loader_end(context)

    def on_train_loader_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_train_loader_end(context)

    def on_training_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_training_end(context)

    def on_validation_end_best_epoch(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_validation_end_best_epoch(context)

    def on_test_loader_start(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_test_loader_start(context)

    def on_test_batch_start(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_test_batch_start(context)

    def on_test_batch_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_test_batch_end(context)

    def on_test_loader_end(self, context: PhaseContext) -> None:
        for callback in self.callbacks:
            callback.on_test_loader_end(context)

PhaseCallback

Bases: Callback

Kept here to keep backward compatibility with old code. New callbacks should use Callback class instead. This callback supports receiving only a subset of events defined in Phase enum:

PRE_TRAINING = "PRE_TRAINING" TRAIN_EPOCH_START = "TRAIN_EPOCH_START" TRAIN_BATCH_END = "TRAIN_BATCH_END" TRAIN_BATCH_STEP = "TRAIN_BATCH_STEP" TRAIN_EPOCH_END = "TRAIN_EPOCH_END"

VALIDATION_BATCH_END = "VALIDATION_BATCH_END" VALIDATION_EPOCH_END = "VALIDATION_EPOCH_END" VALIDATION_END_BEST_EPOCH = "VALIDATION_END_BEST_EPOCH"

TEST_BATCH_END = "TEST_BATCH_END" TEST_END = "TEST_END" POST_TRAINING = "POST_TRAINING"

Source code in training/utils/callbacks/base_callbacks.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
class PhaseCallback(Callback):
    """
    Kept here to keep backward compatibility with old code. New callbacks should use Callback class instead.
    This callback supports receiving only a subset of events defined in Phase enum:

    PRE_TRAINING = "PRE_TRAINING"
    TRAIN_EPOCH_START = "TRAIN_EPOCH_START"
    TRAIN_BATCH_END = "TRAIN_BATCH_END"
    TRAIN_BATCH_STEP = "TRAIN_BATCH_STEP"
    TRAIN_EPOCH_END = "TRAIN_EPOCH_END"

    VALIDATION_BATCH_END = "VALIDATION_BATCH_END"
    VALIDATION_EPOCH_END = "VALIDATION_EPOCH_END"
    VALIDATION_END_BEST_EPOCH = "VALIDATION_END_BEST_EPOCH"

    TEST_BATCH_END = "TEST_BATCH_END"
    TEST_END = "TEST_END"
    POST_TRAINING = "POST_TRAINING"
    """

    def __init__(self, phase: Phase):
        self.phase = phase

    def __call__(self, *args, **kwargs):
        raise NotImplementedError

    def __repr__(self) -> str:
        return self.__class__.__name__

    def on_training_start(self, context: PhaseContext) -> None:
        if self.phase == Phase.PRE_TRAINING:
            self(context)

    def on_train_loader_start(self, context: PhaseContext) -> None:
        if self.phase == Phase.TRAIN_EPOCH_START:
            self(context)

    def on_train_batch_loss_end(self, context: PhaseContext) -> None:
        if self.phase == Phase.TRAIN_BATCH_END:
            self(context)

    def on_train_batch_gradient_step_end(self, context: PhaseContext) -> None:
        if self.phase == Phase.TRAIN_BATCH_STEP:
            self(context)

    def on_train_loader_end(self, context: PhaseContext) -> None:
        if self.phase == Phase.TRAIN_EPOCH_END:
            self(context)

    def on_validation_batch_end(self, context: PhaseContext) -> None:
        if self.phase == Phase.VALIDATION_BATCH_END:
            self(context)

    def on_validation_loader_end(self, context: PhaseContext) -> None:
        if self.phase == Phase.VALIDATION_EPOCH_END:
            self(context)

    def on_validation_end_best_epoch(self, context: PhaseContext) -> None:
        if self.phase == Phase.VALIDATION_END_BEST_EPOCH:
            self(context)

    def on_test_batch_end(self, context: PhaseContext) -> None:
        if self.phase == Phase.TEST_BATCH_END:
            self(context)

    def on_test_loader_end(self, context: PhaseContext) -> None:
        if self.phase == Phase.TEST_END:
            self(context)

    def on_training_end(self, context: PhaseContext) -> None:
        if self.phase == Phase.POST_TRAINING:
            self(context)

PhaseContext

Represents the input for phase callbacks, and is constantly updated after callback calls.

Source code in training/utils/callbacks/base_callbacks.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class PhaseContext:
    """
    Represents the input for phase callbacks, and is constantly updated after callback calls.

    """

    def __init__(
        self,
        epoch=None,
        batch_idx=None,
        optimizer=None,
        metrics_dict=None,
        inputs=None,
        preds=None,
        target=None,
        metrics_compute_fn=None,
        loss_avg_meter=None,
        loss_log_items=None,
        criterion=None,
        device=None,
        experiment_name=None,
        ckpt_dir=None,
        net=None,
        lr_warmup_epochs=None,
        sg_logger=None,
        train_loader=None,
        valid_loader=None,
        training_params=None,
        ddp_silent_mode=None,
        checkpoint_params=None,
        architecture=None,
        arch_params=None,
        metric_idx_in_results_tuple=None,
        metric_to_watch=None,
        valid_metrics=None,
        context_methods=None,
        ema_model=None,
    ):
        self.epoch = epoch
        self.batch_idx = batch_idx
        self.optimizer = optimizer
        self.inputs = inputs
        self.preds = preds
        self.target = target
        self.metrics_dict = metrics_dict
        self.metrics_compute_fn = metrics_compute_fn
        self.loss_avg_meter = loss_avg_meter
        self.loss_log_items = loss_log_items
        self.criterion = criterion
        self.device = device
        self.stop_training = False
        self.experiment_name = experiment_name
        self.ckpt_dir = ckpt_dir
        self.net = net
        self.lr_warmup_epochs = lr_warmup_epochs
        self.sg_logger = sg_logger
        self.train_loader = train_loader
        self.valid_loader = valid_loader
        self.training_params = training_params
        self.ddp_silent_mode = ddp_silent_mode
        self.checkpoint_params = checkpoint_params
        self.architecture = architecture
        self.arch_params = arch_params
        self.metric_to_watch = metric_to_watch
        self.valid_metrics = valid_metrics
        self.context_methods = context_methods
        self.ema_model = ema_model

    def update_context(self, **kwargs):
        for attr, attr_val in kwargs.items():
            setattr(self, attr, attr_val)

BatchStepLinearWarmupLRCallback

Bases: Callback

LR scheduling callback for linear step warmup on each batch step. LR climbs from warmup_initial_lr with to initial lr.

Source code in training/utils/callbacks/callbacks.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
@register_lr_warmup(LRWarmups.LINEAR_BATCH_STEP)
class BatchStepLinearWarmupLRCallback(Callback):
    """
    LR scheduling callback for linear step warmup on each batch step.
    LR climbs from warmup_initial_lr with to initial lr.
    """

    def __init__(
        self,
        warmup_initial_lr: float,
        initial_lr: float,
        train_loader_len: int,
        update_param_groups: bool,
        lr_warmup_steps: int,
        training_params,
        net,
        **kwargs,
    ):
        """

        :param warmup_initial_lr: Starting learning rate
        :param initial_lr: Target learning rate after warmup
        :param train_loader_len: Length of train data loader
        :param lr_warmup_steps: Optional. If passed, will use fixed number of warmup steps to warmup LR. Default is None.
        :param kwargs:
        """

        super(BatchStepLinearWarmupLRCallback, self).__init__()

        if lr_warmup_steps > train_loader_len:
            logger.warning(
                f"Number of warmup steps ({lr_warmup_steps}) is greater than number of steps in epoch ({train_loader_len}). "
                f"Warmup steps will be capped to number of steps in epoch to avoid interfering with any pre-epoch LR schedulers."
            )

        lr_warmup_steps = min(lr_warmup_steps, train_loader_len)
        learning_rates = np.linspace(start=warmup_initial_lr, stop=initial_lr, num=lr_warmup_steps, endpoint=True)

        self.lr = initial_lr
        self.initial_lr = initial_lr
        self.update_param_groups = update_param_groups
        self.training_params = training_params
        self.net = net
        self.learning_rates = learning_rates
        self.train_loader_len = train_loader_len
        self.lr_warmup_steps = lr_warmup_steps

    def on_train_batch_start(self, context: PhaseContext) -> None:
        global_training_step = context.batch_idx + context.epoch * self.train_loader_len
        if global_training_step < self.lr_warmup_steps:
            self.lr = float(self.learning_rates[global_training_step])
            self.update_lr(context.optimizer, context.epoch, context.batch_idx)

    def update_lr(self, optimizer, epoch, batch_idx=None):
        """
        Same as in LRCallbackBase
        :param optimizer:
        :param epoch:
        :param batch_idx:
        :return:
        """
        if self.update_param_groups:
            param_groups = self.net.module.update_param_groups(optimizer.param_groups, self.lr, epoch, batch_idx, self.training_params, self.train_loader_len)
            optimizer.param_groups = param_groups
        else:
            # UPDATE THE OPTIMIZERS PARAMETER
            for param_group in optimizer.param_groups:
                param_group["lr"] = self.lr

__init__(warmup_initial_lr, initial_lr, train_loader_len, update_param_groups, lr_warmup_steps, training_params, net, **kwargs)

Parameters:

Name Type Description Default
warmup_initial_lr float

Starting learning rate

required
initial_lr float

Target learning rate after warmup

required
train_loader_len int

Length of train data loader

required
lr_warmup_steps int

Optional. If passed, will use fixed number of warmup steps to warmup LR. Default is None.

required
kwargs {}
Source code in training/utils/callbacks/callbacks.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def __init__(
    self,
    warmup_initial_lr: float,
    initial_lr: float,
    train_loader_len: int,
    update_param_groups: bool,
    lr_warmup_steps: int,
    training_params,
    net,
    **kwargs,
):
    """

    :param warmup_initial_lr: Starting learning rate
    :param initial_lr: Target learning rate after warmup
    :param train_loader_len: Length of train data loader
    :param lr_warmup_steps: Optional. If passed, will use fixed number of warmup steps to warmup LR. Default is None.
    :param kwargs:
    """

    super(BatchStepLinearWarmupLRCallback, self).__init__()

    if lr_warmup_steps > train_loader_len:
        logger.warning(
            f"Number of warmup steps ({lr_warmup_steps}) is greater than number of steps in epoch ({train_loader_len}). "
            f"Warmup steps will be capped to number of steps in epoch to avoid interfering with any pre-epoch LR schedulers."
        )

    lr_warmup_steps = min(lr_warmup_steps, train_loader_len)
    learning_rates = np.linspace(start=warmup_initial_lr, stop=initial_lr, num=lr_warmup_steps, endpoint=True)

    self.lr = initial_lr
    self.initial_lr = initial_lr
    self.update_param_groups = update_param_groups
    self.training_params = training_params
    self.net = net
    self.learning_rates = learning_rates
    self.train_loader_len = train_loader_len
    self.lr_warmup_steps = lr_warmup_steps

update_lr(optimizer, epoch, batch_idx=None)

Same as in LRCallbackBase

Parameters:

Name Type Description Default
optimizer required
epoch required
batch_idx None

Returns:

Type Description
Source code in training/utils/callbacks/callbacks.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def update_lr(self, optimizer, epoch, batch_idx=None):
    """
    Same as in LRCallbackBase
    :param optimizer:
    :param epoch:
    :param batch_idx:
    :return:
    """
    if self.update_param_groups:
        param_groups = self.net.module.update_param_groups(optimizer.param_groups, self.lr, epoch, batch_idx, self.training_params, self.train_loader_len)
        optimizer.param_groups = param_groups
    else:
        # UPDATE THE OPTIMIZERS PARAMETER
        for param_group in optimizer.param_groups:
            param_group["lr"] = self.lr

BinarySegmentationVisualizationCallback

Bases: PhaseCallback

A callback that adds a visualization of a batch of segmentation predictions to context.sg_logger

Parameters:

Name Type Description Default
phase Phase

When to trigger the callback.

required
freq int

Frequency (in epochs) to perform this callback.

required
batch_idx int

Batch index to perform visualization for.

0
last_img_idx_in_batch int

Last image index to add to log. (default=-1, will take entire batch).

-1
Source code in training/utils/callbacks/callbacks.py
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
class BinarySegmentationVisualizationCallback(PhaseCallback):
    """
    A callback that adds a visualization of a batch of segmentation predictions to context.sg_logger

    :param phase:                   When to trigger the callback.
    :param freq:                    Frequency (in epochs) to perform this callback.
    :param batch_idx:               Batch index to perform visualization for.
    :param last_img_idx_in_batch:   Last image index to add to log. (default=-1, will take entire batch).
    """

    def __init__(self, phase: Phase, freq: int, batch_idx: int = 0, last_img_idx_in_batch: int = -1):
        super(BinarySegmentationVisualizationCallback, self).__init__(phase)
        self.freq = freq
        self.batch_idx = batch_idx
        self.last_img_idx_in_batch = last_img_idx_in_batch

    def __call__(self, context: PhaseContext):
        if context.epoch % self.freq == 0 and context.batch_idx == self.batch_idx:
            if isinstance(context.preds, tuple):
                preds = context.preds[0].clone()
            else:
                preds = context.preds.clone()
            batch_imgs = BinarySegmentationVisualization.visualize_batch(context.inputs, preds, context.target, self.batch_idx)
            batch_imgs = [cv2.cvtColor(image, cv2.COLOR_BGR2RGB) for image in batch_imgs]
            batch_imgs = np.stack(batch_imgs)
            tag = "batch_" + str(self.batch_idx) + "_images"
            context.sg_logger.add_images(tag=tag, images=batch_imgs[: self.last_img_idx_in_batch], global_step=context.epoch, data_format="NHWC")

ContextSgMethods

Class for delegating Trainer's methods, so that only the relevant ones are ("phase wise") are accessible.

Source code in training/utils/callbacks/callbacks.py
30
31
32
33
34
35
36
37
class ContextSgMethods:
    """
    Class for delegating Trainer's methods, so that only the relevant ones are ("phase wise") are accessible.
    """

    def __init__(self, **methods):
        for attr, attr_val in methods.items():
            setattr(self, attr, attr_val)

CosineLRCallback

Bases: LRCallbackBase

Hard coded step Cosine anealing learning rate scheduling.

Source code in training/utils/callbacks/callbacks.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
@register_lr_scheduler(LRSchedulers.COSINE)
class CosineLRCallback(LRCallbackBase):
    """
    Hard coded step Cosine anealing learning rate scheduling.
    """

    def __init__(self, max_epochs, cosine_final_lr_ratio, **kwargs):
        super(CosineLRCallback, self).__init__(Phase.TRAIN_BATCH_STEP, **kwargs)
        self.max_epochs = max_epochs
        self.cosine_final_lr_ratio = cosine_final_lr_ratio

    def perform_scheduling(self, context):
        effective_epoch = context.epoch - self.training_params.lr_warmup_epochs
        effective_max_epochs = self.max_epochs - self.training_params.lr_warmup_epochs - self.training_params.lr_cooldown_epochs
        current_iter = max(0, self.train_loader_len * effective_epoch + context.batch_idx - self.training_params.lr_warmup_steps)
        max_iter = self.train_loader_len * effective_max_epochs - self.training_params.lr_warmup_steps

        lr = self.compute_learning_rate(current_iter, max_iter, self.initial_lr, self.cosine_final_lr_ratio)
        self.lr = float(lr)
        self.update_lr(context.optimizer, context.epoch, context.batch_idx)

    def is_lr_scheduling_enabled(self, context):
        # Account of per-step warmup
        if self.training_params.lr_warmup_steps > 0:
            current_step = self.train_loader_len * context.epoch + context.batch_idx
            return current_step >= self.training_params.lr_warmup_steps

        post_warmup_epochs = self.training_params.max_epochs - self.training_params.lr_cooldown_epochs
        return self.training_params.lr_warmup_epochs <= context.epoch < post_warmup_epochs

    @classmethod
    def compute_learning_rate(cls, step: Union[float, np.ndarray], total_steps: float, initial_lr: float, final_lr_ratio: float):
        # the cosine starts from initial_lr and reaches initial_lr * cosine_final_lr_ratio in last epoch

        lr = 0.5 * initial_lr * (1.0 + np.cos(step / (total_steps + 1) * math.pi))
        return lr * (1 - final_lr_ratio) + (initial_lr * final_lr_ratio)

DeciLabUploadCallback

Bases: PhaseCallback

Post-training callback for uploading and optimizing a model.

Parameters:

Name Type Description Default
model_meta_data

Model's meta-data object. Type: ModelMetadata

required
optimization_request_form

Optimization request form object. Type: OptimizationRequestForm

required
ckpt_name str

Checkpoint filename, inside the checkpoint directory.

'ckpt_best.pth'
Source code in training/utils/callbacks/callbacks.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@register_callback(Callbacks.DECI_LAB_UPLOAD)
class DeciLabUploadCallback(PhaseCallback):
    """
    Post-training callback for uploading and optimizing a model.

    :param model_meta_data:             Model's meta-data object. Type: ModelMetadata
    :param optimization_request_form:   Optimization request form object. Type: OptimizationRequestForm
    :param ckpt_name:                   Checkpoint filename, inside the checkpoint directory.
    """

    def __init__(self, model_meta_data, optimization_request_form, ckpt_name: str = "ckpt_best.pth", **kwargs):
        super().__init__(phase=Phase.POST_TRAINING)
        self.model_meta_data = model_meta_data
        self.optimization_request_form = optimization_request_form
        self.ckpt_name = ckpt_name
        self.platform_client = DeciClient()

    @staticmethod
    def log_optimization_failed():
        logger.info("We couldn't finish your model optimization. Visit https://console.deci.ai for details")

    def upload_model(self, model):
        """
        This function will upload the trained model to the Deci Lab

        :param model: The resulting model from the training process
        """
        self.platform_client.upload_model(model=model, model_meta_data=self.model_meta_data, optimization_request_form=self.optimization_request_form)

    def get_optimization_status(self, optimized_model_name: str):
        """
        This function will do fetch the optimized version of the trained model and check on its benchmark status.
        The status will be checked against the server every 30 seconds and the process will timeout after 30 minutes
        or log about the successful optimization - whichever happens first.

        :param optimized_model_name: Optimized model name

        :return: Whether or not the optimized model has been benchmarked
        """

        def handler(_signum, _frame):
            logger.error("Process timed out. Visit https://console.deci.ai for details")
            return False

        signal.signal(signal.SIGALRM, handler)
        signal.alarm(1800)

        finished = False
        while not finished:
            if self.platform_client.is_model_benchmarking(name=optimized_model_name):
                time.sleep(30)
            else:
                finished = True

        signal.alarm(0)
        return True

    def __call__(self, context: PhaseContext) -> None:
        """
        This function will attempt to upload the trained model and schedule an optimization for it.

        :param context: Training phase context
        """
        try:
            model = copy.deepcopy(context.net)
            model_state_dict_path = os.path.join(context.ckpt_dir, self.ckpt_name)
            model_state_dict = torch.load(model_state_dict_path)["net"]
            model.load_state_dict(state_dict=model_state_dict)

            model = model.module.cpu()
            if hasattr(model, "prep_model_for_conversion"):
                model.prep_model_for_conversion(input_size=self.model_meta_data.input_dimensions)

            self.upload_model(model=model)
            model_name = self.model_meta_data.name
            logger.info(f"Successfully added {model_name} to the model repository")

            optimized_model_name = f"{model_name}_1_1"
            logger.info("We'll wait for the scheduled optimization to finish. Please don't close this window")
            success = self.get_optimization_status(optimized_model_name=optimized_model_name)
            if success:
                logger.info("Successfully finished your model optimization. Visit https://console.deci.ai for details")
            else:
                DeciLabUploadCallback.log_optimization_failed()
        except Exception as ex:
            DeciLabUploadCallback.log_optimization_failed()
            logger.error(ex)

__call__(context)

This function will attempt to upload the trained model and schedule an optimization for it.

Parameters:

Name Type Description Default
context PhaseContext

Training phase context

required
Source code in training/utils/callbacks/callbacks.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def __call__(self, context: PhaseContext) -> None:
    """
    This function will attempt to upload the trained model and schedule an optimization for it.

    :param context: Training phase context
    """
    try:
        model = copy.deepcopy(context.net)
        model_state_dict_path = os.path.join(context.ckpt_dir, self.ckpt_name)
        model_state_dict = torch.load(model_state_dict_path)["net"]
        model.load_state_dict(state_dict=model_state_dict)

        model = model.module.cpu()
        if hasattr(model, "prep_model_for_conversion"):
            model.prep_model_for_conversion(input_size=self.model_meta_data.input_dimensions)

        self.upload_model(model=model)
        model_name = self.model_meta_data.name
        logger.info(f"Successfully added {model_name} to the model repository")

        optimized_model_name = f"{model_name}_1_1"
        logger.info("We'll wait for the scheduled optimization to finish. Please don't close this window")
        success = self.get_optimization_status(optimized_model_name=optimized_model_name)
        if success:
            logger.info("Successfully finished your model optimization. Visit https://console.deci.ai for details")
        else:
            DeciLabUploadCallback.log_optimization_failed()
    except Exception as ex:
        DeciLabUploadCallback.log_optimization_failed()
        logger.error(ex)

get_optimization_status(optimized_model_name)

This function will do fetch the optimized version of the trained model and check on its benchmark status. The status will be checked against the server every 30 seconds and the process will timeout after 30 minutes or log about the successful optimization - whichever happens first.

Parameters:

Name Type Description Default
optimized_model_name str

Optimized model name

required

Returns:

Type Description

Whether or not the optimized model has been benchmarked

Source code in training/utils/callbacks/callbacks.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def get_optimization_status(self, optimized_model_name: str):
    """
    This function will do fetch the optimized version of the trained model and check on its benchmark status.
    The status will be checked against the server every 30 seconds and the process will timeout after 30 minutes
    or log about the successful optimization - whichever happens first.

    :param optimized_model_name: Optimized model name

    :return: Whether or not the optimized model has been benchmarked
    """

    def handler(_signum, _frame):
        logger.error("Process timed out. Visit https://console.deci.ai for details")
        return False

    signal.signal(signal.SIGALRM, handler)
    signal.alarm(1800)

    finished = False
    while not finished:
        if self.platform_client.is_model_benchmarking(name=optimized_model_name):
            time.sleep(30)
        else:
            finished = True

    signal.alarm(0)
    return True

upload_model(model)

This function will upload the trained model to the Deci Lab

Parameters:

Name Type Description Default
model

The resulting model from the training process

required
Source code in training/utils/callbacks/callbacks.py
140
141
142
143
144
145
146
def upload_model(self, model):
    """
    This function will upload the trained model to the Deci Lab

    :param model: The resulting model from the training process
    """
    self.platform_client.upload_model(model=model, model_meta_data=self.model_meta_data, optimization_request_form=self.optimization_request_form)

DetectionVisualizationCallback

Bases: PhaseCallback

A callback that adds a visualization of a batch of detection predictions to context.sg_logger

Parameters:

Name Type Description Default
phase Phase

When to trigger the callback.

required
freq int

Frequency (in epochs) to perform this callback.

required
batch_idx int

Batch index to perform visualization for.

0
classes list

Class list of the dataset.

required
last_img_idx_in_batch int

Last image index to add to log. (default=-1, will take entire batch).

-1
Source code in training/utils/callbacks/callbacks.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
@register_callback(Callbacks.DETECTION_VISUALIZATION_CALLBACK)
class DetectionVisualizationCallback(PhaseCallback):
    """
    A callback that adds a visualization of a batch of detection predictions to context.sg_logger

    :param phase:                   When to trigger the callback.
    :param freq:                    Frequency (in epochs) to perform this callback.
    :param batch_idx:               Batch index to perform visualization for.
    :param classes:                 Class list of the dataset.
    :param last_img_idx_in_batch:   Last image index to add to log. (default=-1, will take entire batch).
    """

    def __init__(
        self,
        phase: Phase,
        freq: int,
        post_prediction_callback: DetectionPostPredictionCallback,
        classes: list,
        batch_idx: int = 0,
        last_img_idx_in_batch: int = -1,
    ):
        super(DetectionVisualizationCallback, self).__init__(phase)
        self.freq = freq
        self.post_prediction_callback = post_prediction_callback
        self.batch_idx = batch_idx
        self.classes = classes
        self.last_img_idx_in_batch = last_img_idx_in_batch

    def __call__(self, context: PhaseContext):
        if context.epoch % self.freq == 0 and context.batch_idx == self.batch_idx:
            # SOME CALCULATIONS ARE IN-PLACE IN NMS, SO CLONE THE PREDICTIONS
            preds = (context.preds[0].clone(), None)
            preds = self.post_prediction_callback(preds)
            batch_imgs = DetectionVisualization.visualize_batch(context.inputs, preds, context.target, self.batch_idx, self.classes)
            batch_imgs = [cv2.cvtColor(image, cv2.COLOR_BGR2RGB) for image in batch_imgs]
            batch_imgs = np.stack(batch_imgs)
            tag = "batch_" + str(self.batch_idx) + "_images"
            context.sg_logger.add_images(tag=tag, images=batch_imgs[: self.last_img_idx_in_batch], global_step=context.epoch, data_format="NHWC")

EpochStepWarmupLRCallback

Bases: LRCallbackBase

LR scheduling callback for linear step warmup. This scheduler uses a whole epoch as single step. LR climbs from warmup_initial_lr with even steps to initial lr. When warmup_initial_lr is None - LR climb starts from initial_lr/(1+warmup_epochs).

Source code in training/utils/callbacks/callbacks.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
@register_lr_warmup(LRWarmups.LINEAR_EPOCH_STEP)
class EpochStepWarmupLRCallback(LRCallbackBase):
    """
    LR scheduling callback for linear step warmup. This scheduler uses a whole epoch as single step.
    LR climbs from warmup_initial_lr with even steps to initial lr. When warmup_initial_lr is None - LR climb starts from
     initial_lr/(1+warmup_epochs).

    """

    def __init__(self, **kwargs):
        super(EpochStepWarmupLRCallback, self).__init__(Phase.TRAIN_EPOCH_START, **kwargs)
        self.warmup_initial_lr = self.training_params.warmup_initial_lr or self.initial_lr / (self.training_params.lr_warmup_epochs + 1)
        self.warmup_step_size = (
            (self.initial_lr - self.warmup_initial_lr) / self.training_params.lr_warmup_epochs if self.training_params.lr_warmup_epochs > 0 else 0
        )

    def perform_scheduling(self, context):
        self.lr = self.warmup_initial_lr + context.epoch * self.warmup_step_size
        self.update_lr(context.optimizer, context.epoch, None)

    def is_lr_scheduling_enabled(self, context):
        return self.training_params.lr_warmup_epochs > 0 and self.training_params.lr_warmup_epochs >= context.epoch

ExponentialLRCallback

Bases: LRCallbackBase

Exponential decay learning rate scheduling. Decays the learning rate by lr_decay_factor every epoch.

Source code in training/utils/callbacks/callbacks.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
@register_lr_scheduler(LRSchedulers.EXP)
class ExponentialLRCallback(LRCallbackBase):
    """
    Exponential decay learning rate scheduling. Decays the learning rate by `lr_decay_factor` every epoch.
    """

    def __init__(self, lr_decay_factor: float, **kwargs):
        super().__init__(phase=Phase.TRAIN_BATCH_STEP, **kwargs)
        self.lr_decay_factor = lr_decay_factor

    def perform_scheduling(self, context):
        effective_epoch = context.epoch - self.training_params.lr_warmup_epochs
        current_iter = self.train_loader_len * effective_epoch + context.batch_idx
        self.lr = self.initial_lr * self.lr_decay_factor ** (current_iter / self.train_loader_len)
        self.update_lr(context.optimizer, context.epoch, context.batch_idx)

    def is_lr_scheduling_enabled(self, context):
        post_warmup_epochs = self.training_params.max_epochs - self.training_params.lr_cooldown_epochs
        return self.training_params.lr_warmup_epochs <= context.epoch < post_warmup_epochs

FunctionLRCallback

Bases: LRCallbackBase

Hard coded rate scheduling for user defined lr scheduling function.

Source code in training/utils/callbacks/callbacks.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
@register_lr_scheduler(LRSchedulers.FUNCTION)
class FunctionLRCallback(LRCallbackBase):
    """
    Hard coded rate scheduling for user defined lr scheduling function.
    """

    @deprecated(version="3.2.0", reason="This callback is deprecated and will be removed in future versions.")
    def __init__(self, max_epochs, lr_schedule_function, **kwargs):
        super(FunctionLRCallback, self).__init__(Phase.TRAIN_BATCH_STEP, **kwargs)
        assert callable(lr_schedule_function), "self.lr_function must be callable"
        self.lr_schedule_function = lr_schedule_function
        self.max_epochs = max_epochs

    def is_lr_scheduling_enabled(self, context):
        post_warmup_epochs = self.training_params.max_epochs - self.training_params.lr_cooldown_epochs
        return self.training_params.lr_warmup_epochs <= context.epoch < post_warmup_epochs

    def perform_scheduling(self, context):
        effective_epoch = context.epoch - self.training_params.lr_warmup_epochs
        effective_max_epochs = self.max_epochs - self.training_params.lr_warmup_epochs - self.training_params.lr_cooldown_epochs
        self.lr = self.lr_schedule_function(
            initial_lr=self.initial_lr,
            epoch=effective_epoch,
            iter=context.batch_idx,
            max_epoch=effective_max_epochs,
            iters_per_epoch=self.train_loader_len,
        )
        self.update_lr(context.optimizer, context.epoch, context.batch_idx)

IllegalLRSchedulerMetric

Bases: Exception

Exception raised illegal combination of training parameters.

Parameters:

Name Type Description Default
metric_name str

Name of the metric that is not supported.

required
metrics_dict dict

Dictionary of metrics that are supported.

required
Source code in training/utils/callbacks/callbacks.py
503
504
505
506
507
508
509
510
511
512
class IllegalLRSchedulerMetric(Exception):
    """Exception raised illegal combination of training parameters.

    :param metric_name: Name of the metric that is not supported.
    :param metrics_dict: Dictionary of metrics that are supported.
    """

    def __init__(self, metric_name: str, metrics_dict: dict):
        self.message = "Illegal metric name: " + metric_name + ". Expected one of metics_dics keys: " + str(metrics_dict.keys())
        super().__init__(self.message)

LRCallbackBase

Bases: PhaseCallback

Base class for hard coded learning rate scheduling regimes, implemented as callbacks.

Source code in training/utils/callbacks/callbacks.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@register_callback(Callbacks.LR_CALLBACK_BASE)
class LRCallbackBase(PhaseCallback):
    """
    Base class for hard coded learning rate scheduling regimes, implemented as callbacks.
    """

    def __init__(self, phase, initial_lr, update_param_groups, train_loader_len, net, training_params, **kwargs):
        super(LRCallbackBase, self).__init__(phase)
        self.initial_lr = initial_lr
        self.lr = initial_lr
        self.update_param_groups = update_param_groups
        self.train_loader_len = train_loader_len
        self.net = net
        self.training_params = training_params

    def __call__(self, context: PhaseContext, **kwargs):
        if self.is_lr_scheduling_enabled(context):
            self.perform_scheduling(context)

    def is_lr_scheduling_enabled(self, context: PhaseContext):
        """
        Predicate that controls whether to perform lr scheduling based on values in context.

        :param context: PhaseContext: current phase's context.
        :return: bool, whether to apply lr scheduling or not.
        """
        raise NotImplementedError

    def perform_scheduling(self, context: PhaseContext):
        """
        Performs lr scheduling based on values in context.

        :param context: PhaseContext: current phase's context.
        """
        raise NotImplementedError

    def update_lr(self, optimizer, epoch, batch_idx=None):
        if self.update_param_groups:
            param_groups = self.net.module.update_param_groups(optimizer.param_groups, self.lr, epoch, batch_idx, self.training_params, self.train_loader_len)
            optimizer.param_groups = param_groups
        else:
            # UPDATE THE OPTIMIZERS PARAMETER
            for param_group in optimizer.param_groups:
                param_group["lr"] = self.lr

is_lr_scheduling_enabled(context)

Predicate that controls whether to perform lr scheduling based on values in context.

Parameters:

Name Type Description Default
context PhaseContext

PhaseContext: current phase's context.

required

Returns:

Type Description

bool, whether to apply lr scheduling or not.

Source code in training/utils/callbacks/callbacks.py
227
228
229
230
231
232
233
234
def is_lr_scheduling_enabled(self, context: PhaseContext):
    """
    Predicate that controls whether to perform lr scheduling based on values in context.

    :param context: PhaseContext: current phase's context.
    :return: bool, whether to apply lr scheduling or not.
    """
    raise NotImplementedError

perform_scheduling(context)

Performs lr scheduling based on values in context.

Parameters:

Name Type Description Default
context PhaseContext

PhaseContext: current phase's context.

required
Source code in training/utils/callbacks/callbacks.py
236
237
238
239
240
241
242
def perform_scheduling(self, context: PhaseContext):
    """
    Performs lr scheduling based on values in context.

    :param context: PhaseContext: current phase's context.
    """
    raise NotImplementedError

LRSchedulerCallback

Bases: PhaseCallback

Learning rate scheduler callback.

When passing call a metrics_dict, with a key=self.metric_name, the value of that metric will monitored for ReduceLROnPlateau (i.e step(metrics_dict[self.metric_name]).

Parameters:

Name Type Description Default
scheduler torch.optim.lr_scheduler._LRScheduler

Learning rate scheduler to be called step() with.

required
metric_name str

Metric name for ReduceLROnPlateau learning rate scheduler.

None
phase Phase

Phase of when to trigger it.

required
Source code in training/utils/callbacks/callbacks.py
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
@register_callback(Callbacks.LR_SCHEDULER)
class LRSchedulerCallback(PhaseCallback):
    """
    Learning rate scheduler callback.

    When passing __call__ a metrics_dict, with a key=self.metric_name, the value of that metric will monitored
         for ReduceLROnPlateau (i.e step(metrics_dict[self.metric_name]).

    :param scheduler:       Learning rate scheduler to be called step() with.
    :param metric_name:     Metric name for ReduceLROnPlateau learning rate scheduler.
    :param phase:           Phase of when to trigger it.
    """

    def __init__(self, scheduler: torch.optim.lr_scheduler._LRScheduler, phase: Phase, metric_name: str = None):
        super(LRSchedulerCallback, self).__init__(phase)
        self.scheduler = scheduler
        self.metric_name = metric_name

    def __call__(self, context: PhaseContext):
        if context.lr_warmup_epochs <= context.epoch:
            if self.metric_name and self.metric_name in context.metrics_dict.keys():
                self.scheduler.step(context.metrics_dict[self.metric_name])
            elif self.metric_name is None:
                self.scheduler.step()
            else:
                raise IllegalLRSchedulerMetric(self.metric_name, context.metrics_dict)

    def __repr__(self):
        return "LRSchedulerCallback: " + repr(self.scheduler)

LinearStepWarmupLRCallback

Bases: EpochStepWarmupLRCallback

Deprecated, use EpochStepWarmupLRCallback instead

Source code in training/utils/callbacks/callbacks.py
278
279
280
281
282
283
284
285
286
287
@register_lr_warmup(LRWarmups.LINEAR_STEP)
class LinearStepWarmupLRCallback(EpochStepWarmupLRCallback):
    """Deprecated, use EpochStepWarmupLRCallback instead"""

    def __init__(self, **kwargs):
        logger.warning(
            f"Parameter {LRWarmups.LINEAR_STEP} has been made deprecated and will be removed in the next SG release. "
            f"Please use `{LRWarmups.LINEAR_EPOCH_STEP}` instead."
        )
        super(LinearStepWarmupLRCallback, self).__init__(**kwargs)

ModelConversionCheckCallback

Bases: PhaseCallback

Pre-training callback that verifies model conversion to onnx given specified conversion parameters.

The model is converted, then inference is applied with onnx runtime.

Use this callback wit hthe same args as DeciPlatformCallback to prevent conversion fails at the end of training.

Parameters:

Name Type Description Default
model_meta_data

Model's meta-data object. Type: ModelMetadata/

required
opset_version

(default=11)

required
do_constant_folding

(default=True)

required
dynamic_axes

(default={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}})

required
input_names

(default=["input"])

required
output_names

(default=["output"])

required
rtol

(default=1e-03)

required
atol

(default=1e-05)

required
Source code in training/utils/callbacks/callbacks.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@register_callback(Callbacks.MODEL_CONVERSION_CHECK)
class ModelConversionCheckCallback(PhaseCallback):
    """
    Pre-training callback that verifies model conversion to onnx given specified conversion parameters.

    The model is converted, then inference is applied with onnx runtime.

    Use this callback wit hthe same args as DeciPlatformCallback to prevent conversion fails at the end of training.

    :param model_meta_data:         Model's meta-data object. Type: ModelMetadata/
    :param opset_version:           (default=11)
    :param do_constant_folding:     (default=True)
    :param dynamic_axes:            (default={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}})
    :param input_names:             (default=["input"])
    :param output_names:            (default=["output"])
    :param rtol:                    (default=1e-03)
    :param atol:                    (default=1e-05)
    """

    def __init__(self, model_meta_data, **kwargs):
        super(ModelConversionCheckCallback, self).__init__(phase=Phase.PRE_TRAINING)
        self.model_meta_data = model_meta_data

        self.opset_version = kwargs.get("opset_version", 10)
        self.do_constant_folding = kwargs.get("do_constant_folding", None) if kwargs.get("do_constant_folding", None) else True
        self.input_names = kwargs.get("input_names") or ["input"]
        self.output_names = kwargs.get("output_names") or ["output"]
        self.dynamic_axes = kwargs.get("dynamic_axes") or {"input": {0: "batch_size"}, "output": {0: "batch_size"}}

        self.rtol = kwargs.get("rtol", 1e-03)
        self.atol = kwargs.get("atol", 1e-05)

    def __call__(self, context: PhaseContext):
        model = copy.deepcopy(context.net.module)
        model = model.cpu()
        model.eval()  # Put model into eval mode

        if hasattr(model, "prep_model_for_conversion"):
            model.prep_model_for_conversion(input_size=self.model_meta_data.input_dimensions)

        x = torch.randn(self.model_meta_data.primary_batch_size, *self.model_meta_data.input_dimensions, requires_grad=False)

        tmp_model_path = os.path.join(context.ckpt_dir, self.model_meta_data.name + "_tmp.onnx")

        with torch.no_grad():
            torch_out = model(x)

        torch.onnx.export(
            model,  # Model being run
            x,  # Model input (or a tuple for multiple inputs)
            tmp_model_path,  # Where to save the model (can be a file or file-like object)
            export_params=True,  # Store the trained parameter weights inside the model file
            opset_version=self.opset_version,
            do_constant_folding=self.do_constant_folding,
            input_names=self.input_names,
            output_names=self.output_names,
            dynamic_axes=self.dynamic_axes,
        )

        onnx_model = onnx.load(tmp_model_path)
        onnx.checker.check_model(onnx_model)

        ort_session = onnxruntime.InferenceSession(tmp_model_path, providers=["CUDAExecutionProvider", "CPUExecutionProvider"])

        # compute ONNX Runtime output prediction
        ort_inputs = {ort_session.get_inputs()[0].name: x.cpu().numpy()}
        ort_outs = ort_session.run(None, ort_inputs)

        # TODO: Ideally we don't want to check this but have the certainty of just calling torch_out.cpu()
        if isinstance(torch_out, List) or isinstance(torch_out, tuple):
            torch_out = torch_out[0]
        # compare ONNX Runtime and PyTorch results
        np.testing.assert_allclose(torch_out.cpu().numpy(), ort_outs[0], rtol=self.rtol, atol=self.atol)

        os.remove(tmp_model_path)

        logger.info("Exported model has been tested with ONNXRuntime, and the result looks good!")

PhaseContextTestCallback

Bases: PhaseCallback

A callback that saves the phase context the for testing.

Source code in training/utils/callbacks/callbacks.py
568
569
570
571
572
573
574
575
576
577
578
class PhaseContextTestCallback(PhaseCallback):
    """
    A callback that saves the phase context the for testing.
    """

    def __init__(self, phase: Phase):
        super(PhaseContextTestCallback, self).__init__(phase)
        self.context = None

    def __call__(self, context: PhaseContext):
        self.context = context

PolyLRCallback

Bases: LRCallbackBase

Hard coded polynomial decay learning rate scheduling (i.e at specific milestones).

Source code in training/utils/callbacks/callbacks.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
@register_lr_scheduler(LRSchedulers.POLY)
class PolyLRCallback(LRCallbackBase):
    """
    Hard coded polynomial decay learning rate scheduling (i.e at specific milestones).
    """

    def __init__(self, max_epochs, **kwargs):
        super(PolyLRCallback, self).__init__(Phase.TRAIN_BATCH_STEP, **kwargs)
        self.max_epochs = max_epochs

    def perform_scheduling(self, context):
        effective_epoch = context.epoch - self.training_params.lr_warmup_epochs
        effective_max_epochs = self.max_epochs - self.training_params.lr_warmup_epochs - self.training_params.lr_cooldown_epochs
        current_iter = (self.train_loader_len * effective_epoch + context.batch_idx) / self.training_params.batch_accumulate
        max_iter = self.train_loader_len * effective_max_epochs / self.training_params.batch_accumulate
        self.lr = self.initial_lr * pow((1.0 - (current_iter / max_iter)), 0.9)
        self.update_lr(context.optimizer, context.epoch, context.batch_idx)

    def is_lr_scheduling_enabled(self, context):
        post_warmup_epochs = self.training_params.max_epochs - self.training_params.lr_cooldown_epochs
        return self.training_params.lr_warmup_epochs <= context.epoch < post_warmup_epochs

RoboflowResultCallback

Bases: Callback

Append the training results to a csv file. Be aware that this does not fully overwrite the existing file, just appends.

Source code in training/utils/callbacks/callbacks.py
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
@register_callback(Callbacks.ROBOFLOW_RESULT_CALLBACK)
class RoboflowResultCallback(Callback):
    """Append the training results to a csv file. Be aware that this does not fully overwrite the existing file, just appends."""

    def __init__(self, dataset_name: str, output_path: Optional[str] = None):
        """
        :param dataset_name:    Name of the dataset that was used to train the model.
        :param output_path:     Full path to the output csv file. By default, save at 'checkpoint_dir/results.csv'
        """
        self.dataset_name = dataset_name
        self.output_path = output_path or os.path.join(get_project_checkpoints_dir_path(), "results.csv")

        if self.output_path is None:
            raise ValueError("Output path must be specified")

        super(RoboflowResultCallback, self).__init__()

    @multi_process_safe
    def on_training_end(self, context: PhaseContext):

        with open(self.output_path, mode="a", newline="") as csv_file:
            writer = csv.writer(csv_file)

            mAP = context.metrics_dict["mAP@0.50:0.95"].item()
            writer.writerow([self.dataset_name, mAP])

__init__(dataset_name, output_path=None)

Parameters:

Name Type Description Default
dataset_name str

Name of the dataset that was used to train the model.

required
output_path Optional[str]

Full path to the output csv file. By default, save at 'checkpoint_dir/results.csv'

None
Source code in training/utils/callbacks/callbacks.py
703
704
705
706
707
708
709
710
711
712
713
714
def __init__(self, dataset_name: str, output_path: Optional[str] = None):
    """
    :param dataset_name:    Name of the dataset that was used to train the model.
    :param output_path:     Full path to the output csv file. By default, save at 'checkpoint_dir/results.csv'
    """
    self.dataset_name = dataset_name
    self.output_path = output_path or os.path.join(get_project_checkpoints_dir_path(), "results.csv")

    if self.output_path is None:
        raise ValueError("Output path must be specified")

    super(RoboflowResultCallback, self).__init__()

StepLRCallback

Bases: LRCallbackBase

Hard coded step learning rate scheduling (i.e at specific milestones).

Source code in training/utils/callbacks/callbacks.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
@register_lr_scheduler(LRSchedulers.STEP)
class StepLRCallback(LRCallbackBase):
    """
    Hard coded step learning rate scheduling (i.e at specific milestones).
    """

    def __init__(self, lr_updates, lr_decay_factor, step_lr_update_freq=None, **kwargs):
        super(StepLRCallback, self).__init__(Phase.TRAIN_EPOCH_END, **kwargs)
        if step_lr_update_freq and len(lr_updates):
            raise ValueError("Only one of [lr_updates, step_lr_update_freq] should be passed to StepLRCallback constructor")

        if step_lr_update_freq:
            max_epochs = self.training_params.max_epochs - self.training_params.lr_cooldown_epochs
            warmup_epochs = self.training_params.lr_warmup_epochs
            lr_updates = [
                int(np.ceil(step_lr_update_freq * x)) for x in range(1, max_epochs) if warmup_epochs <= int(np.ceil(step_lr_update_freq * x)) < max_epochs
            ]
        elif self.training_params.lr_cooldown_epochs > 0:
            logger.warning("Specific lr_updates were passed along with cooldown_epochs > 0," " cooldown will have no effect.")
        self.lr_updates = lr_updates
        self.lr_decay_factor = lr_decay_factor

    def perform_scheduling(self, context):
        num_updates_passed = [x for x in self.lr_updates if x <= context.epoch]
        self.lr = self.initial_lr * self.lr_decay_factor ** len(num_updates_passed)
        self.update_lr(context.optimizer, context.epoch, None)

    def is_lr_scheduling_enabled(self, context):
        return self.training_params.lr_warmup_epochs <= context.epoch

TestLRCallback

Bases: PhaseCallback

Phase callback that collects the learning rates in lr_placeholder at the end of each epoch (used for testing). In the case of multiple parameter groups (i.e multiple learning rates) the learning rate is collected from the first one. The phase is VALIDATION_EPOCH_END to ensure all lr updates have been performed before calling this callback.

Source code in training/utils/callbacks/callbacks.py
726
727
728
729
730
731
732
733
734
735
736
737
738
class TestLRCallback(PhaseCallback):
    """
    Phase callback that collects the learning rates in lr_placeholder at the end of each epoch (used for testing). In
     the case of multiple parameter groups (i.e multiple learning rates) the learning rate is collected from the first
     one. The phase is VALIDATION_EPOCH_END to ensure all lr updates have been performed before calling this callback.
    """

    def __init__(self, lr_placeholder):
        super(TestLRCallback, self).__init__(Phase.VALIDATION_EPOCH_END)
        self.lr_placeholder = lr_placeholder

    def __call__(self, context: PhaseContext):
        self.lr_placeholder.append(context.optimizer.param_groups[0]["lr"])

TrainingStageSwitchCallbackBase

Bases: PhaseCallback

TrainingStageSwitchCallback

A phase callback that is called at a specific epoch (epoch start) to support multi-stage training. It does so by manipulating the objects inside the context.

Parameters:

Name Type Description Default
next_stage_start_epoch int

Epoch idx to apply the stage change.

required
Source code in training/utils/callbacks/callbacks.py
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
class TrainingStageSwitchCallbackBase(PhaseCallback):
    """
    TrainingStageSwitchCallback

    A phase callback that is called at a specific epoch (epoch start) to support multi-stage training.
    It does so by manipulating the objects inside the context.

    :param next_stage_start_epoch: Epoch idx to apply the stage change.
    """

    def __init__(self, next_stage_start_epoch: int):
        super(TrainingStageSwitchCallbackBase, self).__init__(phase=Phase.TRAIN_EPOCH_START)
        self.next_stage_start_epoch = next_stage_start_epoch

    def __call__(self, context: PhaseContext):
        if context.epoch == self.next_stage_start_epoch:
            self.apply_stage_change(context)

    def apply_stage_change(self, context: PhaseContext):
        """
        This method is called when the callback is fired on the next_stage_start_epoch,
         and holds the stage change logic that should be applied to the context's objects.

        :param context: PhaseContext, context of current phase
        """
        raise NotImplementedError

apply_stage_change(context)

This method is called when the callback is fired on the next_stage_start_epoch, and holds the stage change logic that should be applied to the context's objects.

Parameters:

Name Type Description Default
context PhaseContext

PhaseContext, context of current phase

required
Source code in training/utils/callbacks/callbacks.py
668
669
670
671
672
673
674
675
def apply_stage_change(self, context: PhaseContext):
    """
    This method is called when the callback is fired on the next_stage_start_epoch,
     and holds the stage change logic that should be applied to the context's objects.

    :param context: PhaseContext, context of current phase
    """
    raise NotImplementedError

YoloXTrainingStageSwitchCallback

Bases: TrainingStageSwitchCallbackBase

YoloXTrainingStageSwitchCallback

Training stage switch for YoloX training. Disables mosaic, and manipulates YoloX loss to use L1.

Source code in training/utils/callbacks/callbacks.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
@register_callback(Callbacks.YOLOX_TRAINING_STAGE_SWITCH)
class YoloXTrainingStageSwitchCallback(TrainingStageSwitchCallbackBase):
    """
    YoloXTrainingStageSwitchCallback

    Training stage switch for YoloX training.
    Disables mosaic, and manipulates YoloX loss to use L1.

    """

    def __init__(self, next_stage_start_epoch: int = 285):
        super(YoloXTrainingStageSwitchCallback, self).__init__(next_stage_start_epoch=next_stage_start_epoch)

    def apply_stage_change(self, context: PhaseContext):
        for transform in context.train_loader.dataset.transforms:
            if hasattr(transform, "close"):
                transform.close()
        iter(context.train_loader)
        context.criterion.use_l1 = True

PPYoloETrainingStageSwitchCallback

Bases: TrainingStageSwitchCallbackBase

PPYoloETrainingStageSwitchCallback

Training stage switch for PPYolo training. It changes static bbox assigner to a task aligned assigned after certain number of epochs passed

Source code in training/utils/callbacks/ppyoloe_switch_callback.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@register_callback(Callbacks.PPYOLOE_TRAINING_STAGE_SWITCH)
class PPYoloETrainingStageSwitchCallback(TrainingStageSwitchCallbackBase):
    """
    PPYoloETrainingStageSwitchCallback

    Training stage switch for PPYolo training.
    It changes static bbox assigner to a task aligned assigned after certain number of epochs passed

    """

    def __init__(
        self,
        static_assigner_end_epoch: int = 30,
    ):
        super().__init__(next_stage_start_epoch=static_assigner_end_epoch)

    def apply_stage_change(self, context: PhaseContext):
        from super_gradients.training.losses import PPYoloELoss

        if not isinstance(context.criterion, PPYoloELoss):
            raise RuntimeError(
                f"A criterion must be an instance of PPYoloELoss when using PPYoloETrainingStageSwitchCallback. " f"Got criterion {repr(context.criterion)}"
            )
        context.criterion.use_static_assigner = False

MissingPretrainedWeightsException

Bases: Exception

Exception raised by unsupported pretrianed model.

Parameters:

Name Type Description Default
desc

explanation of the error

required
Source code in training/utils/checkpoint_utils.py
253
254
255
256
257
258
259
260
261
class MissingPretrainedWeightsException(Exception):
    """Exception raised by unsupported pretrianed model.

    :param desc: explanation of the error
    """

    def __init__(self, desc):
        self.message = "Missing pretrained wights: " + desc
        super().__init__(self.message)

adapt_state_dict_to_fit_model_layer_names(model_state_dict, source_ckpt, exclude=[], solver=None)

Given a model state dict and source checkpoints, the method tries to correct the keys in the model_state_dict to fit the ckpt in order to properly load the weights into the model. If unsuccessful - returns None :param model_state_dict: the model state_dict :param source_ckpt: checkpoint dict :param exclude optional list for excluded layers :param solver: callable with signature (ckpt_key, ckpt_val, model_key, model_val) that returns a desired weight for ckpt_val. :return: renamed checkpoint dict (if possible)

Source code in training/utils/checkpoint_utils.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def adapt_state_dict_to_fit_model_layer_names(model_state_dict: dict, source_ckpt: dict, exclude: list = [], solver: callable = None):
    """
    Given a model state dict and source checkpoints, the method tries to correct the keys in the model_state_dict to fit
    the ckpt in order to properly load the weights into the model. If unsuccessful - returns None
        :param model_state_dict:               the model state_dict
        :param source_ckpt:                         checkpoint dict
        :param exclude                  optional list for excluded layers
        :param solver:                  callable with signature (ckpt_key, ckpt_val, model_key, model_val)
                                        that returns a desired weight for ckpt_val.
        :return: renamed checkpoint dict (if possible)
    """
    if "net" in source_ckpt.keys():
        source_ckpt = source_ckpt["net"]
    model_state_dict_excluded = {k: v for k, v in model_state_dict.items() if not any(x in k for x in exclude)}
    new_ckpt_dict = {}
    for (ckpt_key, ckpt_val), (model_key, model_val) in zip(source_ckpt.items(), model_state_dict_excluded.items()):
        if solver is not None:
            ckpt_val = solver(ckpt_key, ckpt_val, model_key, model_val)
        if ckpt_val.shape != model_val.shape:
            raise ValueError(f"ckpt layer {ckpt_key} with shape {ckpt_val.shape} does not match {model_key}" f" with shape {model_val.shape} in the model")
        new_ckpt_dict[model_key] = ckpt_val
    return {"net": new_ckpt_dict}

adaptive_load_state_dict(net, state_dict, strict, solver=None)

Adaptively loads state_dict to net, by adapting the state_dict to net's layer names first.

Parameters:

Name Type Description Default
net torch.nn.Module

(nn.Module) to load state_dict to

required
state_dict dict

(dict) Chekpoint state_dict

required
strict Union[bool, StrictLoad]

(StrictLoad) key matching strictness

required
solver

callable with signature (ckpt_key, ckpt_val, model_key, model_val) that returns a desired weight for ckpt_val.

None

Returns:

Type Description
Source code in training/utils/checkpoint_utils.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def adaptive_load_state_dict(net: torch.nn.Module, state_dict: dict, strict: Union[bool, StrictLoad], solver=None):
    """
    Adaptively loads state_dict to net, by adapting the state_dict to net's layer names first.
    :param net: (nn.Module) to load state_dict to
    :param state_dict: (dict) Chekpoint state_dict
    :param strict: (StrictLoad) key matching strictness
    :param solver: callable with signature (ckpt_key, ckpt_val, model_key, model_val)
                     that returns a desired weight for ckpt_val.
    :return:
    """
    state_dict = state_dict["net"] if "net" in state_dict else state_dict
    try:
        strict_bool = strict if isinstance(strict, bool) else strict != StrictLoad.OFF
        net.load_state_dict(state_dict, strict=strict_bool)
    except (RuntimeError, ValueError, KeyError) as ex:
        if strict == StrictLoad.NO_KEY_MATCHING:
            adapted_state_dict = adapt_state_dict_to_fit_model_layer_names(net.state_dict(), state_dict, solver=solver)
            net.load_state_dict(adapted_state_dict["net"], strict=True)
        elif strict == StrictLoad.KEY_MATCHING:
            transfer_weights(net, state_dict)
        else:
            raise_informative_runtime_error(net.state_dict(), state_dict, ex)

copy_ckpt_to_local_folder(local_ckpt_destination_dir, ckpt_filename, remote_ckpt_source_dir=None, path_src='local', overwrite_local_ckpt=False, load_weights_only=False)

Copy the checkpoint from any supported source to a local destination path :param local_ckpt_destination_dir: destination where the checkpoint will be saved to :param ckpt_filename: ckpt_best.pth Or ckpt_latest.pth :param remote_ckpt_source_dir: Name of the source checkpoint to be loaded (S3 Model ull URL) :param path_src: S3 / url :param overwrite_local_ckpt: determines if checkpoint will be saved in destination dir or in a temp folder

:return: Path to checkpoint
Source code in training/utils/checkpoint_utils.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@explicit_params_validation(validation_type="None")
def copy_ckpt_to_local_folder(
    local_ckpt_destination_dir: str,
    ckpt_filename: str,
    remote_ckpt_source_dir: str = None,
    path_src: str = "local",
    overwrite_local_ckpt: bool = False,
    load_weights_only: bool = False,
):
    """
    Copy the checkpoint from any supported source to a local destination path
        :param local_ckpt_destination_dir:  destination where the checkpoint will be saved to
        :param ckpt_filename:         ckpt_best.pth Or ckpt_latest.pth
        :param remote_ckpt_source_dir:       Name of the source checkpoint to be loaded (S3 Model\full URL)
        :param path_src:              S3 / url
        :param overwrite_local_ckpt:  determines if checkpoint will be saved in destination dir or in a temp folder

        :return: Path to checkpoint
    """
    ckpt_file_full_local_path = None

    # IF NOT DEFINED - IT IS SET TO THE TARGET's FOLDER NAME
    remote_ckpt_source_dir = local_ckpt_destination_dir if remote_ckpt_source_dir is None else remote_ckpt_source_dir

    if not overwrite_local_ckpt:
        # CREATE A TEMP FOLDER TO SAVE THE CHECKPOINT TO
        download_ckpt_destination_dir = tempfile.gettempdir()
        print(
            "PLEASE NOTICE - YOU ARE IMPORTING A REMOTE CHECKPOINT WITH overwrite_local_checkpoint = False "
            "-> IT WILL BE REDIRECTED TO A TEMP FOLDER AND DELETED ON MACHINE RESTART"
        )
    else:
        # SAVE THE CHECKPOINT TO MODEL's FOLDER
        download_ckpt_destination_dir = pkg_resources.resource_filename("checkpoints", local_ckpt_destination_dir)

    if path_src.startswith("s3"):
        model_checkpoints_data_interface = ADNNModelRepositoryDataInterfaces(data_connection_location=path_src)
        # DOWNLOAD THE FILE FROM S3 TO THE DESTINATION FOLDER
        ckpt_file_full_local_path = model_checkpoints_data_interface.load_remote_checkpoints_file(
            ckpt_source_remote_dir=remote_ckpt_source_dir,
            ckpt_destination_local_dir=download_ckpt_destination_dir,
            ckpt_file_name=ckpt_filename,
            overwrite_local_checkpoints_file=overwrite_local_ckpt,
        )

        if not load_weights_only:
            # COPY LOG FILES FROM THE REMOTE DIRECTORY TO THE LOCAL ONE ONLY IF LOADING THE CURRENT MODELs CKPT
            model_checkpoints_data_interface.load_all_remote_log_files(
                model_name=remote_ckpt_source_dir, model_checkpoint_local_dir=download_ckpt_destination_dir
            )

    if path_src == "url":
        ckpt_file_full_local_path = download_ckpt_destination_dir + os.path.sep + ckpt_filename
        # DOWNLOAD THE FILE FROM URL TO THE DESTINATION FOLDER
        download_url_to_file(remote_ckpt_source_dir, ckpt_file_full_local_path, progress=True)

    return ckpt_file_full_local_path

load_checkpoint_to_model(net, ckpt_local_path, load_backbone=False, strict=StrictLoad.NO_KEY_MATCHING, load_weights_only=False, load_ema_as_net=False, load_processing_params=False)

Loads the state dict in ckpt_local_path to net and returns the checkpoint's state dict.

Parameters:

Name Type Description Default
load_ema_as_net bool

Will load the EMA inside the checkpoint file to the network when set

False
ckpt_local_path str

local path to the checkpoint file

required
load_backbone bool

whether to load the checkpoint as a backbone

False
net torch.nn.Module

network to load the checkpoint to

required
strict Union[str, StrictLoad] StrictLoad.NO_KEY_MATCHING
load_weights_only bool

Whether to ignore all other entries other then "net".

False
load_processing_params bool

Whether to call set_dataset_processing_params on "processing_params" entry inside the checkpoint file (default=False).

False

Returns:

Type Description
Source code in training/utils/checkpoint_utils.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def load_checkpoint_to_model(
    net: torch.nn.Module,
    ckpt_local_path: str,
    load_backbone: bool = False,
    strict: Union[str, StrictLoad] = StrictLoad.NO_KEY_MATCHING,
    load_weights_only: bool = False,
    load_ema_as_net: bool = False,
    load_processing_params: bool = False,
):
    """
    Loads the state dict in ckpt_local_path to net and returns the checkpoint's state dict.


    :param load_ema_as_net: Will load the EMA inside the checkpoint file to the network when set
    :param ckpt_local_path: local path to the checkpoint file
    :param load_backbone: whether to load the checkpoint as a backbone
    :param net: network to load the checkpoint to
    :param strict:
    :param load_weights_only: Whether to ignore all other entries other then "net".
    :param load_processing_params: Whether to call set_dataset_processing_params on "processing_params" entry inside the
     checkpoint file (default=False).
    :return:
    """
    if isinstance(strict, str):
        strict = StrictLoad(strict)

    if ckpt_local_path is None or not os.path.exists(ckpt_local_path):
        error_msg = "Error - loading Model Checkpoint: Path {} does not exist".format(ckpt_local_path)
        raise RuntimeError(error_msg)

    if load_backbone and not hasattr(net, "backbone"):
        raise ValueError("No backbone attribute in net - Can't load backbone weights")

    # LOAD THE LOCAL CHECKPOINT PATH INTO A state_dict OBJECT
    checkpoint = read_ckpt_state_dict(ckpt_path=ckpt_local_path)

    if load_ema_as_net:
        if "ema_net" not in checkpoint.keys():
            raise ValueError("Can't load ema network- no EMA network stored in checkpoint file")
        else:
            checkpoint["net"] = checkpoint["ema_net"]

    # LOAD THE CHECKPOINTS WEIGHTS TO THE MODEL
    if load_backbone:
        adaptive_load_state_dict(net.backbone, checkpoint, strict)
    else:
        adaptive_load_state_dict(net, checkpoint, strict)

    message_suffix = " checkpoint." if not load_ema_as_net else " EMA checkpoint."
    message_model = "model" if not load_backbone else "model's backbone"
    logger.info("Successfully loaded " + message_model + " weights from " + ckpt_local_path + message_suffix)

    if (isinstance(net, HasPredict) or (hasattr(net, "module") and isinstance(net.module, HasPredict))) and load_processing_params:
        if "processing_params" not in checkpoint.keys():
            raise ValueError("Can't load processing params - could not find any stored in checkpoint file.")
        try:
            net.set_dataset_processing_params(**checkpoint["processing_params"])
        except Exception as e:
            logger.warning(
                f"Could not set preprocessing pipeline from the checkpoint dataset: {e}. Before calling"
                "predict make sure to call set_dataset_processing_params."
            )

    if load_weights_only or load_backbone:
        # DISCARD ALL THE DATA STORED IN CHECKPOINT OTHER THAN THE WEIGHTS
        [checkpoint.pop(key) for key in list(checkpoint.keys()) if key != "net"]

    return checkpoint

load_pretrained_weights(model, architecture, pretrained_weights)

Loads pretrained weights from the MODEL_URLS dictionary to model

Parameters:

Name Type Description Default
architecture str

name of the model's architecture

required
model torch.nn.Module

model to load pretrinaed weights for

required
pretrained_weights str

name for the pretrianed weights (i.e imagenet)

required

Returns:

Type Description

None

Source code in training/utils/checkpoint_utils.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def load_pretrained_weights(model: torch.nn.Module, architecture: str, pretrained_weights: str):

    """
    Loads pretrained weights from the MODEL_URLS dictionary to model
    :param architecture: name of the model's architecture
    :param model: model to load pretrinaed weights for
    :param pretrained_weights: name for the pretrianed weights (i.e imagenet)
    :return: None
    """
    from super_gradients.common.object_names import Models

    model_url_key = architecture + "_" + str(pretrained_weights)
    if model_url_key not in MODEL_URLS.keys():
        raise MissingPretrainedWeightsException(model_url_key)

    url = MODEL_URLS[model_url_key]

    if architecture in {Models.YOLO_NAS_S, Models.YOLO_NAS_M, Models.YOLO_NAS_L}:
        logger.info(
            "License Notification: YOLO-NAS pre-trained weights are subjected to the specific license terms and conditions detailed in \n"
            "https://github.com/Deci-AI/super-gradients/blob/master/LICENSE.YOLONAS.md\n"
            "By downloading the pre-trained weight files you agree to comply with these terms."
        )

    unique_filename = url.split("https://sghub.deci.ai/models/")[1].replace("/", "_").replace(" ", "_")
    map_location = torch.device("cpu")
    pretrained_state_dict = load_state_dict_from_url(url=url, map_location=map_location, file_name=unique_filename)
    _load_weights(architecture, model, pretrained_state_dict)

load_pretrained_weights_local(model, architecture, pretrained_weights)

Loads pretrained weights from the MODEL_URLS dictionary to model

Parameters:

Name Type Description Default
architecture str

name of the model's architecture

required
model torch.nn.Module

model to load pretrinaed weights for

required
pretrained_weights str

path tp pretrained weights

required

Returns:

Type Description

None

Source code in training/utils/checkpoint_utils.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def load_pretrained_weights_local(model: torch.nn.Module, architecture: str, pretrained_weights: str):

    """
    Loads pretrained weights from the MODEL_URLS dictionary to model
    :param architecture: name of the model's architecture
    :param model: model to load pretrinaed weights for
    :param pretrained_weights: path tp pretrained weights
    :return: None
    """

    map_location = torch.device("cpu")

    pretrained_state_dict = torch.load(pretrained_weights, map_location=map_location)
    _load_weights(architecture, model, pretrained_state_dict)

raise_informative_runtime_error(state_dict, checkpoint, exception_msg)

Given a model state dict and source checkpoints, the method calls "adapt_state_dict_to_fit_model_layer_names" and enhances the exception_msg if loading the checkpoint_dict via the conversion method is possible

Source code in training/utils/checkpoint_utils.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def raise_informative_runtime_error(state_dict, checkpoint, exception_msg):
    """
    Given a model state dict and source checkpoints, the method calls "adapt_state_dict_to_fit_model_layer_names"
    and enhances the exception_msg if loading the checkpoint_dict via the conversion method is possible
    """
    try:
        new_ckpt_dict = adapt_state_dict_to_fit_model_layer_names(state_dict, checkpoint)
        temp_file = tempfile.NamedTemporaryFile().name + ".pt"
        torch.save(new_ckpt_dict, temp_file)
        exception_msg = (
            f"\n{'=' * 200}\n{str(exception_msg)} \nconvert ckpt via the utils.adapt_state_dict_to_fit_"
            f"model_layer_names method\na converted checkpoint file was saved in the path {temp_file}\n{'=' * 200}"
        )
    except ValueError as ex:  # IN CASE adapt_state_dict_to_fit_model_layer_names WAS UNSUCCESSFUL
        exception_msg = f"\n{'=' * 200} \nThe checkpoint and model shapes do no fit, e.g.: {ex}\n{'=' * 200}"
    finally:
        raise RuntimeError(exception_msg)

transfer_weights(model, model_state_dict)

Copy weights from model_state_dict to model, skipping layers that are incompatible (Having different shape). This method is helpful if you are doing some model surgery and want to load part of the model weights into different model. This function will go over all the layers in model_state_dict and will try to find a matching layer in model and copy the weights into it. If shape will not match, the layer will be skipped.

Parameters:

Name Type Description Default
model nn.Module

Model to load weights into

required
model_state_dict Mapping[str, Tensor]

Model state dict to load weights from

required

Returns:

Type Description
None

None

Source code in training/utils/checkpoint_utils.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def transfer_weights(model: nn.Module, model_state_dict: Mapping[str, Tensor]) -> None:
    """
    Copy weights from `model_state_dict` to `model`, skipping layers that are incompatible (Having different shape).
    This method is helpful if you are doing some model surgery and want to load
    part of the model weights into different model.
    This function will go over all the layers in `model_state_dict` and will try to find a matching layer in `model` and
    copy the weights into it. If shape will not match, the layer will be skipped.

    :param model: Model to load weights into
    :param model_state_dict: Model state dict to load weights from
    :return: None
    """
    for name, value in model_state_dict.items():
        try:
            model.load_state_dict(collections.OrderedDict([(name, value)]), strict=False)
        except RuntimeError:
            pass

AccessCounterMixin

Implements access counting mechanism for configuration settings (dicts/lists). It is achieved by wrapping underlying config and override getitem, getattr methods to catch read operations and increments access counter for each property.

Source code in training/utils/config_utils.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class AccessCounterMixin:
    """
    Implements access counting mechanism for configuration settings (dicts/lists).
    It is achieved by wrapping underlying config and override __getitem__, __getattr__ methods to catch read operations
    and increments access counter for each property.
    """

    _access_counter: Mapping[str, int]
    _prefix: str  # Prefix string

    def maybe_wrap_as_counter(self, value, key, count_usage: bool = True):
        """
        Return an attribute value optionally wrapped as access counter adapter to trace read counts.

        :param value: Attribute value
        :param key: Attribute name
        :param count_usage: Whether increment usage count for given attribute. Default is True.

        :return: wrapped value
        """
        key_with_prefix = self._prefix + str(key)
        if count_usage:
            self._access_counter[key_with_prefix] += 1
        if isinstance(value, Mapping):
            return AccessCounterDict(value, access_counter=self._access_counter, prefix=key_with_prefix + ".")
        if isinstance(value, Iterable) and not isinstance(value, str):
            return AccessCounterList(value, access_counter=self._access_counter, prefix=key_with_prefix + ".")
        return value

    @property
    def access_counter(self):
        return self._access_counter

    @abc.abstractmethod
    def get_all_params(self) -> Set[str]:
        raise NotImplementedError()

    def get_used_params(self) -> Set[str]:
        used_params = {k for (k, v) in self._access_counter.items() if v > 0}
        return used_params

    def get_unused_params(self) -> Set[str]:
        unused_params = self.get_all_params() - self.get_used_params()
        return unused_params

    def __copy__(self):
        cls = self.__class__
        result = cls.__new__(cls)
        result.__dict__.update(self.__dict__)
        return result

    def __deepcopy__(self, memo):
        cls = self.__class__
        result = cls.__new__(cls)
        memo[id(self)] = result
        for k, v in self.__dict__.items():
            setattr(result, k, deepcopy(v, memo))
        return result

maybe_wrap_as_counter(value, key, count_usage=True)

Return an attribute value optionally wrapped as access counter adapter to trace read counts.

Parameters:

Name Type Description Default
value

Attribute value

required
key

Attribute name

required
count_usage bool

Whether increment usage count for given attribute. Default is True.

True

Returns:

Type Description

wrapped value

Source code in training/utils/config_utils.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def maybe_wrap_as_counter(self, value, key, count_usage: bool = True):
    """
    Return an attribute value optionally wrapped as access counter adapter to trace read counts.

    :param value: Attribute value
    :param key: Attribute name
    :param count_usage: Whether increment usage count for given attribute. Default is True.

    :return: wrapped value
    """
    key_with_prefix = self._prefix + str(key)
    if count_usage:
        self._access_counter[key_with_prefix] += 1
    if isinstance(value, Mapping):
        return AccessCounterDict(value, access_counter=self._access_counter, prefix=key_with_prefix + ".")
    if isinstance(value, Iterable) and not isinstance(value, str):
        return AccessCounterList(value, access_counter=self._access_counter, prefix=key_with_prefix + ".")
    return value

raise_if_unused_params(config)

A helper function to check whether all confuration parameters were used on given block of code. Motivation to have this check is to ensure there were no typo or outdated configuration parameters. It at least one of config parameters was not used, this function will raise an UnusedConfigParamException exception. Example usage:

from super_gradients.training.utils import raise_if_unused_params

with raise_if_unused_params(some_config) as some_config: do_something_with_config(some_config)

Parameters:

Name Type Description Default
config Union[HpmStruct, DictConfig, ListConfig, Mapping, list, tuple]

A config to check

required

Returns:

Type Description
ConfigInspector

An instance of ConfigInspector

Source code in training/utils/config_utils.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def raise_if_unused_params(config: Union[HpmStruct, DictConfig, ListConfig, Mapping, list, tuple]) -> ConfigInspector:
    """
    A helper function to check whether all confuration parameters were used on given block of code. Motivation to have
    this check is to ensure there were no typo or outdated configuration parameters.
    It at least one of config parameters was not used, this function will raise an UnusedConfigParamException exception.
    Example usage:

    >>> from super_gradients.training.utils import raise_if_unused_params
    >>>
    >>> with raise_if_unused_params(some_config) as some_config:
    >>>    do_something_with_config(some_config)
    >>>

    :param config: A config to check
    :return: An instance of ConfigInspector
    """
    if isinstance(config, HpmStruct):
        wrapper_cls = AccessCounterHpmStruct
    elif isinstance(config, (Mapping, DictConfig)):
        wrapper_cls = AccessCounterDict
    elif isinstance(config, (list, tuple, ListConfig)):
        wrapper_cls = AccessCounterList
    else:
        raise RuntimeError(f"Unsupported type. Root configuration object must be a mapping or list. Got type {type(config)}")

    return ConfigInspector(wrapper_cls(config), unused_params_action="raise")

warn_if_unused_params(config)

A helper function to check whether all confuration parameters were used on given block of code. Motivation to have this check is to ensure there were no typo or outdated configuration parameters. It at least one of config parameters was not used, this function will emit warning. Example usage:

from super_gradients.training.utils import warn_if_unused_params

with warn_if_unused_params(some_config) as some_config: do_something_with_config(some_config)

Parameters:

Name Type Description Default
config

A config to check

required

Returns:

Type Description

An instance of ConfigInspector

Source code in training/utils/config_utils.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def warn_if_unused_params(config):
    """
    A helper function to check whether all confuration parameters were used on given block of code. Motivation to have
    this check is to ensure there were no typo or outdated configuration parameters.
    It at least one of config parameters was not used, this function will emit warning.
    Example usage:

    >>> from super_gradients.training.utils import warn_if_unused_params
    >>>
    >>> with warn_if_unused_params(some_config) as some_config:
    >>>    do_something_with_config(some_config)
    >>>

    :param config: A config to check
    :return: An instance of ConfigInspector
    """
    if isinstance(config, HpmStruct):
        wrapper_cls = AccessCounterHpmStruct
    elif isinstance(config, (Mapping, DictConfig)):
        wrapper_cls = AccessCounterDict
    elif isinstance(config, (list, tuple, ListConfig)):
        wrapper_cls = AccessCounterList
    else:
        raise RuntimeError("Unsupported type. Root configuration object must be a mapping or list.")

    return ConfigInspector(wrapper_cls(config), unused_params_action="warn")

wrap_with_warning(cls, message)

Emits a warning when target class of function is called.

from super_gradients.training.utils.deprecated_utils import wrap_with_warning from super_gradients.training.utils.callbacks import EpochStepWarmupLRCallback, BatchStepLinearWarmupLRCallback

LR_WARMUP_CLS_DICT = { "linear": wrap_with_warning( EpochStepWarmupLRCallback, message=f"Parameter linear has been made deprecated and will be removed in the next SG release. Please use linear_epoch instead", ), 'linear_epoch`': EpochStepWarmupLRCallback, }

Parameters:

Name Type Description Default
cls Callable

A class or function to wrap

required
message str

A message to emit when this class is called

required

Returns:

Type Description
Any

A factory method that returns wrapped class

Source code in training/utils/deprecated_utils.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def wrap_with_warning(cls: Callable, message: str) -> Any:
    """
    Emits a warning when target class of function is called.

    >>> from super_gradients.training.utils.deprecated_utils import wrap_with_warning
    >>> from super_gradients.training.utils.callbacks import EpochStepWarmupLRCallback, BatchStepLinearWarmupLRCallback
    >>>
    >>> LR_WARMUP_CLS_DICT = {
    >>>     "linear": wrap_with_warning(
    >>>         EpochStepWarmupLRCallback,
    >>>         message=f"Parameter `linear` has been made deprecated and will be removed in the next SG release. Please use `linear_epoch` instead",
    >>>     ),
    >>>     'linear_epoch`': EpochStepWarmupLRCallback,
    >>> }

    :param cls: A class or function to wrap
    :param message: A message to emit when this class is called
    :return: A factory method that returns wrapped class
    """

    def _inner_fn(*args, **kwargs):
        logger.warning(message)
        return cls(*args, **kwargs)

    return _inner_fn

Anchors

Bases: nn.Module

A wrapper function to hold the anchors used by detection models such as Yolo

Source code in training/utils/detection_utils.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
class Anchors(nn.Module):
    """
    A wrapper function to hold the anchors used by detection models such as Yolo
    """

    def __init__(self, anchors_list: List[List], strides: List[int]):
        """
        :param anchors_list: of the shape [[w1,h1,w2,h2,w3,h3], [w4,h4,w5,h5,w6,h6] .... where each sublist holds
            the width and height of the anchors of a specific detection layer.
            i.e. for a model with 3 detection layers, each containing 5 anchors the format will be a of 3 sublists of 10 numbers each
            The width and height are in pixels (not relative to image size)
        :param strides: a list containing the stride of the layers from which the detection heads are fed.
            i.e. if the firs detection head is connected to the backbone after the input dimensions were reduces by 8, the first number will be 8
        """
        super().__init__()

        self.__anchors_list = anchors_list
        self.__strides = strides

        self._check_all_lists(anchors_list)
        self._check_all_len_equal_and_even(anchors_list)

        self._stride = nn.Parameter(torch.Tensor(strides).float(), requires_grad=False)
        anchors = torch.Tensor(anchors_list).float().view(len(anchors_list), -1, 2)
        self._anchors = nn.Parameter(anchors / self._stride.view(-1, 1, 1), requires_grad=False)
        self._anchor_grid = nn.Parameter(anchors.clone().view(len(anchors_list), 1, -1, 1, 1, 2), requires_grad=False)

    @staticmethod
    def _check_all_lists(anchors: list) -> bool:
        for a in anchors:
            if not isinstance(a, (list, ListConfig)):
                raise RuntimeError("All objects of anchors_list must be lists")

    @staticmethod
    def _check_all_len_equal_and_even(anchors: list) -> bool:
        len_of_first = len(anchors[0])
        for a in anchors:
            if len(a) % 2 == 1 or len(a) != len_of_first:
                raise RuntimeError("All objects of anchors_list must be of the same even length")

    @property
    def stride(self) -> nn.Parameter:
        return self._stride

    @property
    def anchors(self) -> nn.Parameter:
        return self._anchors

    @property
    def anchor_grid(self) -> nn.Parameter:
        return self._anchor_grid

    @property
    def detection_layers_num(self) -> int:
        return self._anchors.shape[0]

    @property
    def num_anchors(self) -> int:
        return self._anchors.shape[1]

    def __repr__(self):
        return f"anchors_list: {self.__anchors_list} strides: {self.__strides}"

__init__(anchors_list, strides)

Parameters:

Name Type Description Default
anchors_list List[List]

of the shape [[w1,h1,w2,h2,w3,h3], [w4,h4,w5,h5,w6,h6] .... where each sublist holds the width and height of the anchors of a specific detection layer. i.e. for a model with 3 detection layers, each containing 5 anchors the format will be a of 3 sublists of 10 numbers each The width and height are in pixels (not relative to image size)

required
strides List[int]

a list containing the stride of the layers from which the detection heads are fed. i.e. if the firs detection head is connected to the backbone after the input dimensions were reduces by 8, the first number will be 8

required
Source code in training/utils/detection_utils.py
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def __init__(self, anchors_list: List[List], strides: List[int]):
    """
    :param anchors_list: of the shape [[w1,h1,w2,h2,w3,h3], [w4,h4,w5,h5,w6,h6] .... where each sublist holds
        the width and height of the anchors of a specific detection layer.
        i.e. for a model with 3 detection layers, each containing 5 anchors the format will be a of 3 sublists of 10 numbers each
        The width and height are in pixels (not relative to image size)
    :param strides: a list containing the stride of the layers from which the detection heads are fed.
        i.e. if the firs detection head is connected to the backbone after the input dimensions were reduces by 8, the first number will be 8
    """
    super().__init__()

    self.__anchors_list = anchors_list
    self.__strides = strides

    self._check_all_lists(anchors_list)
    self._check_all_len_equal_and_even(anchors_list)

    self._stride = nn.Parameter(torch.Tensor(strides).float(), requires_grad=False)
    anchors = torch.Tensor(anchors_list).float().view(len(anchors_list), -1, 2)
    self._anchors = nn.Parameter(anchors / self._stride.view(-1, 1, 1), requires_grad=False)
    self._anchor_grid = nn.Parameter(anchors.clone().view(len(anchors_list), 1, -1, 1, 1, 2), requires_grad=False)

CrowdDetectionCollateFN

Bases: DetectionCollateFN

Collate function for Yolox training with additional_batch_items that includes crowd targets

Source code in training/utils/detection_utils.py
779
780
781
782
783
784
785
786
787
788
@register_collate_function()
class CrowdDetectionCollateFN(DetectionCollateFN):
    """
    Collate function for Yolox training with additional_batch_items that includes crowd targets
    """

    def __call__(self, data) -> Tuple[torch.Tensor, torch.Tensor, Dict[str, torch.Tensor]]:
        batch = default_collate(data)
        ims, targets, crowd_targets = batch[0:3]
        return ims, self._format_targets(targets), {"crowd_targets": self._format_targets(crowd_targets)}

CrowdDetectionPPYoloECollateFN

Bases: PPYoloECollateFN

Collate function for Yolox training with additional_batch_items that includes crowd targets

Source code in training/utils/detection_utils.py
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
class CrowdDetectionPPYoloECollateFN(PPYoloECollateFN):
    """
    Collate function for Yolox training with additional_batch_items that includes crowd targets
    """

    def __call__(self, data) -> Tuple[torch.Tensor, torch.Tensor, Dict[str, torch.Tensor]]:

        if self.random_resize_sizes is not None:
            data = self.random_resize(data)

        batch = default_collate(data)
        ims, targets, crowd_targets = batch
        if ims.shape[3] == 3:
            ims = torch.moveaxis(ims, -1, 1).float()

        return ims, self._format_targets(targets), {"crowd_targets": self._format_targets(crowd_targets)}

DetectionCollateFN

Collate function for Yolox training

Source code in training/utils/detection_utils.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
@register_collate_function()
class DetectionCollateFN:
    """
    Collate function for Yolox training
    """

    def __call__(self, data) -> Tuple[torch.Tensor, torch.Tensor]:
        batch = default_collate(data)
        ims, targets = batch[0:2]
        return ims, self._format_targets(targets)

    def _format_targets(self, targets: torch.Tensor) -> torch.Tensor:
        nlabel = (targets.sum(dim=2) > 0).sum(dim=1)  # number of label per image
        targets_merged = []
        for i in range(targets.shape[0]):
            targets_im = targets[i, : nlabel[i]]
            batch_column = targets.new_ones((targets_im.shape[0], 1)) * i
            targets_merged.append(torch.cat((batch_column, targets_im), 1))
        return torch.cat(targets_merged, 0)

DetectionPostPredictionCallback

Bases: ABC, nn.Module

Source code in training/utils/detection_utils.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class DetectionPostPredictionCallback(ABC, nn.Module):
    def __init__(self) -> None:
        super().__init__()

    @abstractmethod
    def forward(self, x, device: str):
        """

        :param x:       the output of your model
        :param device:  the device to move all output tensors into
        :return:        a list with length batch_size, each item in the list is a detections
                        with shape: nx6 (x1, y1, x2, y2, confidence, class) where x and y are in range [0,1]
        """
        raise NotImplementedError

forward(x, device) abstractmethod

Parameters:

Name Type Description Default
x

the output of your model

required
device str

the device to move all output tensors into

required

Returns:

Type Description

a list with length batch_size, each item in the list is a detections with shape: nx6 (x1, y1, x2, y2, confidence, class) where x and y are in range [0,1]

Source code in training/utils/detection_utils.py
186
187
188
189
190
191
192
193
194
195
@abstractmethod
def forward(self, x, device: str):
    """

    :param x:       the output of your model
    :param device:  the device to move all output tensors into
    :return:        a list with length batch_size, each item in the list is a detections
                    with shape: nx6 (x1, y1, x2, y2, confidence, class) where x and y are in range [0,1]
    """
    raise NotImplementedError

DetectionTargetsFormat

Bases: Enum

Enum class for the different detection output formats

When NORMALIZED is not specified- the type refers to unnormalized image coordinates (of the bboxes).

For example: LABEL_NORMALIZED_XYXY means [class_idx,x1,y1,x2,y2]

Source code in training/utils/detection_utils.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class DetectionTargetsFormat(Enum):
    """
    Enum class for the different detection output formats

    When NORMALIZED is not specified- the type refers to unnormalized image coordinates (of the bboxes).

    For example:
    LABEL_NORMALIZED_XYXY means [class_idx,x1,y1,x2,y2]
    """

    LABEL_XYXY = "LABEL_XYXY"
    XYXY_LABEL = "XYXY_LABEL"
    LABEL_NORMALIZED_XYXY = "LABEL_NORMALIZED_XYXY"
    NORMALIZED_XYXY_LABEL = "NORMALIZED_XYXY_LABEL"
    LABEL_CXCYWH = "LABEL_CXCYWH"
    CXCYWH_LABEL = "CXCYWH_LABEL"
    LABEL_NORMALIZED_CXCYWH = "LABEL_NORMALIZED_CXCYWH"
    NORMALIZED_CXCYWH_LABEL = "NORMALIZED_CXCYWH_LABEL"

DetectionVisualization

Source code in training/utils/detection_utils.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
class DetectionVisualization:
    @staticmethod
    def _generate_color_mapping(num_classes: int) -> List[Tuple[int]]:
        """
        Generate a unique BGR color for each class
        """

        return generate_color_mapping(num_classes=num_classes)

    @staticmethod
    def _draw_box_title(
        color_mapping: List[Tuple[int]],
        class_names: List[str],
        box_thickness: int,
        image_np: np.ndarray,
        x1: int,
        y1: int,
        x2: int,
        y2: int,
        class_id: int,
        pred_conf: float = None,
        is_target: bool = False,
    ):
        color = color_mapping[class_id]
        class_name = class_names[class_id]

        if is_target:
            title = f"[GT] {class_name}"
        else:
            title = f'[Pred] {class_name}  {str(round(pred_conf, 2)) if pred_conf is not None else ""}'

        draw_bbox(image=image_np, title=title, x1=x1, y1=y1, x2=x2, y2=y2, box_thickness=box_thickness, color=color)
        return image_np

    @staticmethod
    def _visualize_image(
        image_np: np.ndarray,
        pred_boxes: np.ndarray,
        target_boxes: np.ndarray,
        class_names: List[str],
        box_thickness: int,
        gt_alpha: float,
        image_scale: float,
        checkpoint_dir: str,
        image_name: str,
    ):
        image_np = cv2.resize(image_np, (0, 0), fx=image_scale, fy=image_scale, interpolation=cv2.INTER_NEAREST)
        color_mapping = DetectionVisualization._generate_color_mapping(len(class_names))

        # Draw predictions
        pred_boxes[:, :4] *= image_scale
        for box in pred_boxes:
            image_np = DetectionVisualization._draw_box_title(
                color_mapping, class_names, box_thickness, image_np, *box[:4].astype(int), class_id=int(box[5]), pred_conf=box[4]
            )

        # Draw ground truths
        target_boxes_image = np.zeros_like(image_np, np.uint8)
        for box in target_boxes:
            target_boxes_image = DetectionVisualization._draw_box_title(
                color_mapping, class_names, box_thickness, target_boxes_image, *box[2:], class_id=box[1], is_target=True
            )

        # Transparent overlay of ground truth boxes
        mask = target_boxes_image.astype(bool)
        image_np[mask] = cv2.addWeighted(image_np, 1 - gt_alpha, target_boxes_image, gt_alpha, 0)[mask]

        if checkpoint_dir is None:
            return image_np
        else:
            pathlib.Path(checkpoint_dir).mkdir(parents=True, exist_ok=True)
            cv2.imwrite(os.path.join(checkpoint_dir, str(image_name) + ".jpg"), image_np)

    @staticmethod
    def _scaled_ccwh_to_xyxy(target_boxes: np.ndarray, h: int, w: int, image_scale: float) -> np.ndarray:
        """
        Modifies target_boxes inplace
        :param target_boxes:    (c1, c2, w, h) boxes in [0, 1] range
        :param h:               image height
        :param w:               image width
        :param image_scale:     desired scale for the boxes w.r.t. w and h
        :return:                targets in (x1, y1, x2, y2) format
                                in range [0, w * self.image_scale] [0, h * self.image_scale]
        """
        # unscale
        target_boxes[:, 2:] *= np.array([[w, h, w, h]])

        # x1 = c1 - w // 2; y1 = c2 - h // 2
        target_boxes[:, 2] -= target_boxes[:, 4] // 2
        target_boxes[:, 3] -= target_boxes[:, 5] // 2
        # x2 = w + x1; y2 = h + y1
        target_boxes[:, 4] += target_boxes[:, 2]
        target_boxes[:, 5] += target_boxes[:, 3]

        target_boxes[:, 2:] *= image_scale
        target_boxes = target_boxes.astype(int)
        return target_boxes

    @staticmethod
    def visualize_batch(
        image_tensor: torch.Tensor,
        pred_boxes: List[torch.Tensor],
        target_boxes: torch.Tensor,
        batch_name: Union[int, str],
        class_names: List[str],
        checkpoint_dir: str = None,
        undo_preprocessing_func: Callable[[torch.Tensor], np.ndarray] = undo_image_preprocessing,
        box_thickness: int = 2,
        image_scale: float = 1.0,
        gt_alpha: float = 0.4,
    ):
        """
        A helper function to visualize detections predicted by a network:
        saves images into a given path with a name that is {batch_name}_{imade_idx_in_the_batch}.jpg, one batch per call.
        Colors are generated on the fly: uniformly sampled from color wheel to support all given classes.

        Adjustable:
            * Ground truth box transparency;
            * Box width;
            * Image size (larger or smaller than what's provided)

        :param image_tensor:            rgb images, (B, H, W, 3)
        :param pred_boxes:              boxes after NMS for each image in a batch, each (Num_boxes, 6),
                                        values on dim 1 are: x1, y1, x2, y2, confidence, class
        :param target_boxes:            (Num_targets, 6), values on dim 1 are: image id in a batch, class, x y w h
                                        (coordinates scaled to [0, 1])
        :param batch_name:              id of the current batch to use for image naming

        :param class_names:             names of all classes, each on its own index
        :param checkpoint_dir:          a path where images with boxes will be saved. if None, the result images will
                                        be returns as a list of numpy image arrays

        :param undo_preprocessing_func: a function to convert preprocessed images tensor into a batch of cv2-like images
        :param box_thickness:           box line thickness in px
        :param image_scale:             scale of an image w.r.t. given image size,
                                        e.g. incoming images are (320x320), use scale = 2. to preview in (640x640)
        :param gt_alpha:                a value in [0., 1.] transparency on ground truth boxes,
                                        0 for invisible, 1 for fully opaque
        """
        image_np = undo_preprocessing_func(image_tensor.detach())
        targets = DetectionVisualization._scaled_ccwh_to_xyxy(target_boxes.detach().cpu().numpy(), *image_np.shape[1:3], image_scale)

        out_images = []
        for i in range(image_np.shape[0]):
            preds = pred_boxes[i].detach().cpu().numpy() if pred_boxes[i] is not None else np.empty((0, 6))
            targets_cur = targets[targets[:, 0] == i]

            image_name = "_".join([str(batch_name), str(i)])
            res_image = DetectionVisualization._visualize_image(
                image_np[i], preds, targets_cur, class_names, box_thickness, gt_alpha, image_scale, checkpoint_dir, image_name
            )
            if res_image is not None:
                out_images.append(res_image)

        return out_images

visualize_batch(image_tensor, pred_boxes, target_boxes, batch_name, class_names, checkpoint_dir=None, undo_preprocessing_func=undo_image_preprocessing, box_thickness=2, image_scale=1.0, gt_alpha=0.4) staticmethod

A helper function to visualize detections predicted by a network: saves images into a given path with a name that is {batch_name}_{imade_idx_in_the_batch}.jpg, one batch per call. Colors are generated on the fly: uniformly sampled from color wheel to support all given classes.

Adjustable: * Ground truth box transparency; * Box width; * Image size (larger or smaller than what's provided)

Parameters:

Name Type Description Default
image_tensor torch.Tensor

rgb images, (B, H, W, 3)

required
pred_boxes List[torch.Tensor]

boxes after NMS for each image in a batch, each (Num_boxes, 6), values on dim 1 are: x1, y1, x2, y2, confidence, class

required
target_boxes torch.Tensor

(Num_targets, 6), values on dim 1 are: image id in a batch, class, x y w h (coordinates scaled to [0, 1])

required
batch_name Union[int, str]

id of the current batch to use for image naming

required
class_names List[str]

names of all classes, each on its own index

required
checkpoint_dir str

a path where images with boxes will be saved. if None, the result images will be returns as a list of numpy image arrays

None
undo_preprocessing_func Callable[[torch.Tensor], np.ndarray]

a function to convert preprocessed images tensor into a batch of cv2-like images

undo_image_preprocessing
box_thickness int

box line thickness in px

2
image_scale float

scale of an image w.r.t. given image size, e.g. incoming images are (320x320), use scale = 2. to preview in (640x640)

1.0
gt_alpha float

a value in [0., 1.] transparency on ground truth boxes, 0 for invisible, 1 for fully opaque

0.4
Source code in training/utils/detection_utils.py
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
@staticmethod
def visualize_batch(
    image_tensor: torch.Tensor,
    pred_boxes: List[torch.Tensor],
    target_boxes: torch.Tensor,
    batch_name: Union[int, str],
    class_names: List[str],
    checkpoint_dir: str = None,
    undo_preprocessing_func: Callable[[torch.Tensor], np.ndarray] = undo_image_preprocessing,
    box_thickness: int = 2,
    image_scale: float = 1.0,
    gt_alpha: float = 0.4,
):
    """
    A helper function to visualize detections predicted by a network:
    saves images into a given path with a name that is {batch_name}_{imade_idx_in_the_batch}.jpg, one batch per call.
    Colors are generated on the fly: uniformly sampled from color wheel to support all given classes.

    Adjustable:
        * Ground truth box transparency;
        * Box width;
        * Image size (larger or smaller than what's provided)

    :param image_tensor:            rgb images, (B, H, W, 3)
    :param pred_boxes:              boxes after NMS for each image in a batch, each (Num_boxes, 6),
                                    values on dim 1 are: x1, y1, x2, y2, confidence, class
    :param target_boxes:            (Num_targets, 6), values on dim 1 are: image id in a batch, class, x y w h
                                    (coordinates scaled to [0, 1])
    :param batch_name:              id of the current batch to use for image naming

    :param class_names:             names of all classes, each on its own index
    :param checkpoint_dir:          a path where images with boxes will be saved. if None, the result images will
                                    be returns as a list of numpy image arrays

    :param undo_preprocessing_func: a function to convert preprocessed images tensor into a batch of cv2-like images
    :param box_thickness:           box line thickness in px
    :param image_scale:             scale of an image w.r.t. given image size,
                                    e.g. incoming images are (320x320), use scale = 2. to preview in (640x640)
    :param gt_alpha:                a value in [0., 1.] transparency on ground truth boxes,
                                    0 for invisible, 1 for fully opaque
    """
    image_np = undo_preprocessing_func(image_tensor.detach())
    targets = DetectionVisualization._scaled_ccwh_to_xyxy(target_boxes.detach().cpu().numpy(), *image_np.shape[1:3], image_scale)

    out_images = []
    for i in range(image_np.shape[0]):
        preds = pred_boxes[i].detach().cpu().numpy() if pred_boxes[i] is not None else np.empty((0, 6))
        targets_cur = targets[targets[:, 0] == i]

        image_name = "_".join([str(batch_name), str(i)])
        res_image = DetectionVisualization._visualize_image(
            image_np[i], preds, targets_cur, class_names, box_thickness, gt_alpha, image_scale, checkpoint_dir, image_name
        )
        if res_image is not None:
            out_images.append(res_image)

    return out_images

NMS_Type

Bases: str, Enum

Type of non max suppression algorithm that can be used for post processing detection

Source code in training/utils/detection_utils.py
343
344
345
346
347
348
349
class NMS_Type(str, Enum):
    """
    Type of non max suppression algorithm that can be used for post processing detection
    """

    ITERATIVE = "iterative"
    MATRIX = "matrix"

PPYoloECollateFN

Collate function for PPYoloE training

Source code in training/utils/detection_utils.py
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
class PPYoloECollateFN:
    """
    Collate function for PPYoloE training
    """

    def __init__(self, random_resize_sizes: Union[List[int], None] = None, random_resize_modes: Union[List[int], None] = None):
        """

        :param random_resize_sizes: (rows, cols)
        """
        self.random_resize_sizes = random_resize_sizes
        self.random_resize_modes = random_resize_modes

    def __repr__(self):
        return f"PPYoloECollateFN(random_resize_sizes={self.random_resize_sizes}, random_resize_modes={self.random_resize_modes})"

    def __str__(self):
        return self.__repr__()

    def __call__(self, data) -> Tuple[torch.Tensor, torch.Tensor]:
        if self.random_resize_sizes is not None:
            data = self.random_resize(data)

        batch = default_collate(data)
        ims, targets = batch
        targets = self._format_targets(targets)
        ims = torch.moveaxis(ims, -1, 1).float()

        return ims, targets

    def random_resize(self, batch):
        target_size = random.choice(self.random_resize_sizes)
        interpolation = random.choice(self.random_resize_modes)
        batch = [self.random_resize_sample(sample, target_size, interpolation) for sample in batch]
        return batch

    def random_resize_sample(self, sample, target_size, interpolation):
        if len(sample) == 2:
            image, targets = sample  # TARGETS ARE IN LABEL_CXCYWH
            with_crowd = False
        elif len(sample == 3):
            image, targets, crowd_targets = sample
            with_crowd = True
        else:
            raise RuntimeError()

        dsize = int(target_size), int(target_size)
        scale_factors = target_size / image.shape[0], target_size / image.shape[1]

        image = cv2.resize(
            image,
            dsize=dsize,
            interpolation=interpolation,
        )

        sy, sx = scale_factors
        targets[:, 1:5] *= np.array([[sx, sy, sx, sy]], dtype=targets.dtype)
        if with_crowd:
            crowd_targets[:, 1:5] *= np.array([[sx, sy, sx, sy]], dtype=targets.dtype)
            return image, targets, crowd_targets

        return image, targets

    def _format_targets(self, targets: torch.Tensor) -> torch.Tensor:
        """

        :param targets:
        :return: Tensor of shape [B, N, 6], where 6 elements are (index, c, cx, cy, w, h)
        """
        # Same collate as in YoloX. We convert to PPYoloTargets in the loss
        nlabel = (targets.sum(dim=2) > 0).sum(dim=1)  # number of label per image
        targets_merged = []
        for i in range(targets.shape[0]):
            targets_im = targets[i, : nlabel[i]]
            batch_column = targets.new_ones((targets_im.shape[0], 1)) * i
            targets_merged.append(torch.cat((batch_column, targets_im), 1))

        return torch.cat(targets_merged, 0)

__init__(random_resize_sizes=None, random_resize_modes=None)

Parameters:

Name Type Description Default
random_resize_sizes Union[List[int], None]

(rows, cols)

None
Source code in training/utils/detection_utils.py
686
687
688
689
690
691
692
def __init__(self, random_resize_sizes: Union[List[int], None] = None, random_resize_modes: Union[List[int], None] = None):
    """

    :param random_resize_sizes: (rows, cols)
    """
    self.random_resize_sizes = random_resize_sizes
    self.random_resize_modes = random_resize_modes

adjust_box_anns(bbox, scale_ratio, padw, padh, w_max, h_max)

Adjusts the bbox annotations of rescaled, padded image.

Parameters:

Name Type Description Default
bbox

(np.array) bbox to modify.

required
scale_ratio

(float) scale ratio between rescale output image and original one.

required
padw

(int) width padding size.

required
padh

(int) height padding size.

required
w_max

(int) width border.

required
h_max

(int) height border

required

Returns:

Type Description

modified bbox (np.array)

Source code in training/utils/detection_utils.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
def adjust_box_anns(bbox, scale_ratio, padw, padh, w_max, h_max):
    """
    Adjusts the bbox annotations of rescaled, padded image.

    :param bbox: (np.array) bbox to modify.
    :param scale_ratio: (float) scale ratio between rescale output image and original one.
    :param padw: (int) width padding size.
    :param padh: (int) height padding size.
    :param w_max: (int) width border.
    :param h_max: (int) height border
    :return: modified bbox (np.array)
    """
    bbox[:, 0::2] = np.clip(bbox[:, 0::2] * scale_ratio + padw, 0, w_max)
    bbox[:, 1::2] = np.clip(bbox[:, 1::2] * scale_ratio + padh, 0, h_max)
    return bbox

box_iou(box1, box2)

Return intersection-over-union (Jaccard index) of boxes. Both sets of boxes are expected to be in (x1, y1, x2, y2) format.

Parameters:

Name Type Description Default
box1 torch.Tensor

Tensor of shape [N, 4]

required
box2 torch.Tensor

Tensor of shape [M, 4]

required

Returns:

Type Description
torch.Tensor

iou, Tensor of shape [N, M]: the NxM matrix containing the pairwise IoU values for every element in boxes1 and boxes2

Source code in training/utils/detection_utils.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def box_iou(box1: torch.Tensor, box2: torch.Tensor) -> torch.Tensor:
    # https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py
    """
    Return intersection-over-union (Jaccard index) of boxes.
    Both sets of boxes are expected to be in (x1, y1, x2, y2) format.
    :param box1: Tensor of shape [N, 4]
    :param box2: Tensor of shape [M, 4]
    :return:     iou, Tensor of shape [N, M]: the NxM matrix containing the pairwise IoU values for every element in boxes1 and boxes2
    """

    def box_area(box):
        # box = 4xn
        return (box[2] - box[0]) * (box[3] - box[1])

    area1 = box_area(box1.T)
    area2 = box_area(box2.T)

    # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2)
    inter = (torch.min(box1[:, None, 2:], box2[:, 2:]) - torch.max(box1[:, None, :2], box2[:, :2])).clamp(0).prod(2)
    return inter / (area1[:, None] + area2 - inter)  # iou = inter / (area1 + area2 - inter)

calc_bbox_iou_matrix(pred)

calculate iou for every pair of boxes in the boxes vector

Parameters:

Name Type Description Default
pred torch.Tensor

a 3-dimensional tensor containing all boxes for a batch of images [N, num_boxes, 4], where each box format is [x1,y1,x2,y2]

required

Returns:

Type Description

a 3-dimensional matrix where M_i_j_k is the iou of box j and box k of the i'th image in the batch

Source code in training/utils/detection_utils.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def calc_bbox_iou_matrix(pred: torch.Tensor):
    """
    calculate iou for every pair of boxes in the boxes vector
    :param pred: a 3-dimensional tensor containing all boxes for a batch of images [N, num_boxes, 4], where
                 each box format is [x1,y1,x2,y2]
    :return: a 3-dimensional matrix where M_i_j_k is the iou of box j and box k of the i'th image in the batch
    """
    box = pred[:, :, :4]  #
    b1_x1, b1_y1 = box[:, :, 0].unsqueeze(1), box[:, :, 1].unsqueeze(1)
    b1_x2, b1_y2 = box[:, :, 2].unsqueeze(1), box[:, :, 3].unsqueeze(1)

    b2_x1 = b1_x1.transpose(2, 1)
    b2_x2 = b1_x2.transpose(2, 1)
    b2_y1 = b1_y1.transpose(2, 1)
    b2_y2 = b1_y2.transpose(2, 1)
    intersection_area = (torch.min(b1_x2, b2_x2) - torch.max(b1_x1, b2_x1)).clamp(0) * (torch.min(b1_y2, b2_y2) - torch.max(b1_y1, b2_y1)).clamp(0)
    # Union Area
    w1, h1 = b1_x2 - b1_x1, b1_y2 - b1_y1
    w2, h2 = b2_x2 - b2_x1, b2_y2 - b2_y1
    union_area = (w1 * h1 + 1e-16) + w2 * h2 - intersection_area
    ious = intersection_area / union_area
    return ious

calculate_bbox_iou_matrix(box1, box2, x1y1x2y2=True, GIoU=False, DIoU=False, CIoU=False, eps=1e-09)

calculate iou matrix containing the iou of every couple iuo(i,j) where i is in box1 and j is in box2

Parameters:

Name Type Description Default
box1

a 2D tensor of boxes (shape N x 4)

required
box2

a 2D tensor of boxes (shape M x 4)

required
x1y1x2y2

boxes format is x1y1x2y2 (True) or xywh where xy is the center (False)

True

Returns:

Type Description

a 2D iou matrix (shape NxM)

Source code in training/utils/detection_utils.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def calculate_bbox_iou_matrix(box1, box2, x1y1x2y2=True, GIoU: bool = False, DIoU=False, CIoU=False, eps=1e-9):
    """
    calculate iou matrix containing the iou of every couple iuo(i,j) where i is in box1 and j is in box2
    :param box1: a 2D tensor of boxes (shape N x 4)
    :param box2: a 2D tensor of boxes (shape M x 4)
    :param x1y1x2y2: boxes format is x1y1x2y2 (True) or xywh where xy is the center (False)
    :return: a 2D iou matrix (shape NxM)
    """
    if box1.dim() > 1:
        box1 = box1.T

    # Get the coordinates of bounding boxes
    if x1y1x2y2:  # x1, y1, x2, y2 = box1
        b1_x1, b1_y1, b1_x2, b1_y2 = box1[0], box1[1], box1[2], box1[3]
        b2_x1, b2_y1, b2_x2, b2_y2 = box2[:, 0], box2[:, 1], box2[:, 2], box2[:, 3]
    else:  # x, y, w, h = box1
        b1_x1, b1_x2 = box1[0] - box1[2] / 2, box1[0] + box1[2] / 2
        b1_y1, b1_y2 = box1[1] - box1[3] / 2, box1[1] + box1[3] / 2
        b2_x1, b2_x2 = box2[:, 0] - box2[:, 2] / 2, box2[:, 0] + box2[:, 2] / 2
        b2_y1, b2_y2 = box2[:, 1] - box2[:, 3] / 2, box2[:, 1] + box2[:, 3] / 2

    b1_x1, b1_y1, b1_x2, b1_y2 = b1_x1.unsqueeze(1), b1_y1.unsqueeze(1), b1_x2.unsqueeze(1), b1_y2.unsqueeze(1)

    return _iou(CIoU, DIoU, GIoU, b1_x1, b1_x2, b1_y1, b1_y2, b2_x1, b2_x2, b2_y1, b2_y2, eps)

compute_box_area(box)

Compute the area of one or many boxes.

Parameters:

Name Type Description Default
box torch.Tensor

One or many boxes, shape = (4, ?), each box in format (x1, y1, x2, y2)

required

Returns:

Type Description
torch.Tensor

Area of every box, shape = (1, ?)

Source code in training/utils/detection_utils.py
791
792
793
794
795
796
797
798
def compute_box_area(box: torch.Tensor) -> torch.Tensor:
    """
    Compute the area of one or many boxes.
    :param box: One or many boxes, shape = (4, ?), each box in format (x1, y1, x2, y2)
    :return: Area of every box, shape = (1, ?)
    """
    # box = 4xn
    return (box[2] - box[0]) * (box[3] - box[1])

compute_detection_matching(output, targets, height, width, iou_thresholds, denormalize_targets, device, crowd_targets=None, top_k=100, return_on_cpu=True)

Match predictions (NMS output) and the targets (ground truth) with respect to IoU and confidence score.

Parameters:

Name Type Description Default
output List[torch.Tensor]

list (of length batch_size) of Tensors of shape (num_predictions, 6) format: (x1, y1, x2, y2, confidence, class_label) where x1,y1,x2,y2 are according to image size

required
targets torch.Tensor

targets for all images of shape (total_num_targets, 6) format: (index, x, y, w, h, label) where x,y,w,h are in range [0,1]

required
height int

dimensions of the image

required
width int

dimensions of the image

required
iou_thresholds torch.Tensor

Threshold to compute the mAP

required
device str

Device

required
crowd_targets Optional[torch.Tensor]

crowd targets for all images of shape (total_num_crowd_targets, 6) format: (index, x, y, w, h, label) where x,y,w,h are in range [0,1]

None
top_k int

Number of predictions to keep per class, ordered by confidence score

100
denormalize_targets bool

If True, denormalize the targets and crowd_targets

required
return_on_cpu bool

If True, the output will be returned on "CPU", otherwise it will be returned on "device"

True

Returns:

Type Description
List[Tuple]

list of the following tensors, for every image: :preds_matched: Tensor of shape (num_img_predictions, n_iou_thresholds) True when prediction (i) is matched with a target with respect to the (j)th IoU threshold :preds_to_ignore: Tensor of shape (num_img_predictions, n_iou_thresholds) True when prediction (i) is matched with a crowd target with respect to the (j)th IoU threshold :preds_scores: Tensor of shape (num_img_predictions), confidence score for every prediction :preds_cls: Tensor of shape (num_img_predictions), predicted class for every prediction :targets_cls: Tensor of shape (num_img_targets), ground truth class for every target

Source code in training/utils/detection_utils.py
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
def compute_detection_matching(
    output: List[torch.Tensor],
    targets: torch.Tensor,
    height: int,
    width: int,
    iou_thresholds: torch.Tensor,
    denormalize_targets: bool,
    device: str,
    crowd_targets: Optional[torch.Tensor] = None,
    top_k: int = 100,
    return_on_cpu: bool = True,
) -> List[Tuple]:
    """
    Match predictions (NMS output) and the targets (ground truth) with respect to IoU and confidence score.
    :param output:          list (of length batch_size) of Tensors of shape (num_predictions, 6)
                            format:     (x1, y1, x2, y2, confidence, class_label) where x1,y1,x2,y2 are according to image size
    :param targets:         targets for all images of shape (total_num_targets, 6)
                            format:     (index, x, y, w, h, label) where x,y,w,h are in range [0,1]
    :param height:          dimensions of the image
    :param width:           dimensions of the image
    :param iou_thresholds:  Threshold to compute the mAP
    :param device:          Device
    :param crowd_targets:   crowd targets for all images of shape (total_num_crowd_targets, 6)
                            format:     (index, x, y, w, h, label) where x,y,w,h are in range [0,1]
    :param top_k:           Number of predictions to keep per class, ordered by confidence score
    :param denormalize_targets: If True, denormalize the targets and crowd_targets
    :param return_on_cpu:   If True, the output will be returned on "CPU", otherwise it will be returned on "device"

    :return:                list of the following tensors, for every image:
        :preds_matched:     Tensor of shape (num_img_predictions, n_iou_thresholds)
                            True when prediction (i) is matched with a target with respect to the (j)th IoU threshold
        :preds_to_ignore:   Tensor of shape (num_img_predictions, n_iou_thresholds)
                            True when prediction (i) is matched with a crowd target with respect to the (j)th IoU threshold
        :preds_scores:      Tensor of shape (num_img_predictions), confidence score for every prediction
        :preds_cls:         Tensor of shape (num_img_predictions), predicted class for every prediction
        :targets_cls:       Tensor of shape (num_img_targets), ground truth class for every target
    """
    output = map(lambda tensor: None if tensor is None else tensor.to(device), output)
    targets, iou_thresholds = targets.to(device), iou_thresholds.to(device)

    # If crowd_targets is not provided, we patch it with an empty tensor
    crowd_targets = torch.zeros(size=(0, 6), device=device) if crowd_targets is None else crowd_targets.to(device)

    batch_metrics = []
    for img_i, img_preds in enumerate(output):
        # If img_preds is None (not prediction for this image), we patch it with an empty tensor
        img_preds = img_preds if img_preds is not None else torch.zeros(size=(0, 6), device=device)
        img_targets = targets[targets[:, 0] == img_i, 1:]
        img_crowd_targets = crowd_targets[crowd_targets[:, 0] == img_i, 1:]

        img_matching_tensors = compute_img_detection_matching(
            preds=img_preds,
            targets=img_targets,
            crowd_targets=img_crowd_targets,
            denormalize_targets=denormalize_targets,
            height=height,
            width=width,
            device=device,
            iou_thresholds=iou_thresholds,
            top_k=top_k,
            return_on_cpu=return_on_cpu,
        )
        batch_metrics.append(img_matching_tensors)

    return batch_metrics

compute_detection_metrics(preds_matched, preds_to_ignore, preds_scores, preds_cls, targets_cls, device, recall_thresholds=None, score_threshold=0.1)

Compute the list of precision, recall, MaP and f1 for every recall IoU threshold and for every class.

Parameters:

Name Type Description Default
preds_matched torch.Tensor

Tensor of shape (num_predictions, n_iou_thresholds) True when prediction (i) is matched with a target with respect to the (j)th IoU threshold

required
preds_scores torch.Tensor

Tensor of shape (num_predictions), confidence score for every prediction

required
preds_cls torch.Tensor

Tensor of shape (num_predictions), predicted class for every prediction

required
targets_cls torch.Tensor

Tensor of shape (num_targets), ground truth class for every target box to be detected

required
recall_thresholds Optional[torch.Tensor]

Recall thresholds used to compute MaP.

None
score_threshold Optional[float]

Minimum confidence score to consider a prediction for the computation of precision, recall and f1 (not MaP)

0.1
device str

Device

required

Returns:

Type Description
Tuple

:ap, precision, recall, f1: Tensors of shape (n_class, nb_iou_thrs) :unique_classes: Vector with all unique target classes

Source code in training/utils/detection_utils.py
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
def compute_detection_metrics(
    preds_matched: torch.Tensor,
    preds_to_ignore: torch.Tensor,
    preds_scores: torch.Tensor,
    preds_cls: torch.Tensor,
    targets_cls: torch.Tensor,
    device: str,
    recall_thresholds: Optional[torch.Tensor] = None,
    score_threshold: Optional[float] = 0.1,
) -> Tuple:
    """
    Compute the list of precision, recall, MaP and f1 for every recall IoU threshold and for every class.

    :param preds_matched:      Tensor of shape (num_predictions, n_iou_thresholds)
                                    True when prediction (i) is matched with a target with respect to the (j)th IoU threshold
    :param preds_to_ignore     Tensor of shape (num_predictions, n_iou_thresholds)
                                    True when prediction (i) is matched with a crowd target with respect to the (j)th IoU threshold
    :param preds_scores:       Tensor of shape (num_predictions), confidence score for every prediction
    :param preds_cls:          Tensor of shape (num_predictions), predicted class for every prediction
    :param targets_cls:        Tensor of shape (num_targets), ground truth class for every target box to be detected
    :param recall_thresholds:   Recall thresholds used to compute MaP.
    :param score_threshold:    Minimum confidence score to consider a prediction for the computation of
                                    precision, recall and f1 (not MaP)
    :param device:             Device

    :return:
        :ap, precision, recall, f1: Tensors of shape (n_class, nb_iou_thrs)
        :unique_classes:            Vector with all unique target classes
    """
    preds_matched, preds_to_ignore = preds_matched.to(device), preds_to_ignore.to(device)
    preds_scores, preds_cls, targets_cls = preds_scores.to(device), preds_cls.to(device), targets_cls.to(device)

    recall_thresholds = torch.linspace(0, 1, 101, device=device) if recall_thresholds is None else recall_thresholds.to(device)

    unique_classes = torch.unique(targets_cls)
    n_class, nb_iou_thrs = len(unique_classes), preds_matched.shape[-1]

    ap = torch.zeros((n_class, nb_iou_thrs), device=device)
    precision = torch.zeros((n_class, nb_iou_thrs), device=device)
    recall = torch.zeros((n_class, nb_iou_thrs), device=device)

    for cls_i, cls in enumerate(unique_classes):
        cls_preds_idx, cls_targets_idx = (preds_cls == cls), (targets_cls == cls)
        cls_ap, cls_precision, cls_recall = compute_detection_metrics_per_cls(
            preds_matched=preds_matched[cls_preds_idx],
            preds_to_ignore=preds_to_ignore[cls_preds_idx],
            preds_scores=preds_scores[cls_preds_idx],
            n_targets=cls_targets_idx.sum(),
            recall_thresholds=recall_thresholds,
            score_threshold=score_threshold,
            device=device,
        )
        ap[cls_i, :] = cls_ap
        precision[cls_i, :] = cls_precision
        recall[cls_i, :] = cls_recall

    f1 = 2 * precision * recall / (precision + recall + 1e-16)

    return ap, precision, recall, f1, unique_classes

compute_detection_metrics_per_cls(preds_matched, preds_to_ignore, preds_scores, n_targets, recall_thresholds, score_threshold, device)

Compute the list of precision, recall and MaP of a given class for every recall IoU threshold.

:param preds_matched:      Tensor of shape (num_predictions, n_iou_thresholds)
                                True when prediction (i) is matched with a target
                                with respect to the(j)th IoU threshold
:param preds_to_ignore     Tensor of shape (num_predictions, n_iou_thresholds)
                                True when prediction (i) is matched with a crowd target
                                with respect to the (j)th IoU threshold
:param preds_scores:       Tensor of shape (num_predictions), confidence score for every prediction
:param n_targets:          Number of target boxes of this class
:param recall_thresholds:  Tensor of shape (max_n_rec_thresh) list of recall thresholds used to compute MaP
:param score_threshold:    Minimum confidence score to consider a prediction for the computation of
                                precision and recall (not MaP)
:param device:             Device

:return ap, precision, recall:  Tensors of shape (nb_iou_thrs)
Source code in training/utils/detection_utils.py
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
def compute_detection_metrics_per_cls(
    preds_matched: torch.Tensor,
    preds_to_ignore: torch.Tensor,
    preds_scores: torch.Tensor,
    n_targets: int,
    recall_thresholds: torch.Tensor,
    score_threshold: float,
    device: str,
):
    """
    Compute the list of precision, recall and MaP of a given class for every recall IoU threshold.

        :param preds_matched:      Tensor of shape (num_predictions, n_iou_thresholds)
                                        True when prediction (i) is matched with a target
                                        with respect to the(j)th IoU threshold
        :param preds_to_ignore     Tensor of shape (num_predictions, n_iou_thresholds)
                                        True when prediction (i) is matched with a crowd target
                                        with respect to the (j)th IoU threshold
        :param preds_scores:       Tensor of shape (num_predictions), confidence score for every prediction
        :param n_targets:          Number of target boxes of this class
        :param recall_thresholds:  Tensor of shape (max_n_rec_thresh) list of recall thresholds used to compute MaP
        :param score_threshold:    Minimum confidence score to consider a prediction for the computation of
                                        precision and recall (not MaP)
        :param device:             Device

        :return ap, precision, recall:  Tensors of shape (nb_iou_thrs)
    """
    nb_iou_thrs = preds_matched.shape[-1]

    tps = preds_matched
    fps = torch.logical_and(torch.logical_not(preds_matched), torch.logical_not(preds_to_ignore))

    if len(tps) == 0:
        return torch.zeros(nb_iou_thrs, device=device), torch.zeros(nb_iou_thrs, device=device), torch.zeros(nb_iou_thrs, device=device)

    # Sort by decreasing score
    dtype = torch.uint8 if preds_scores.is_cuda and preds_scores.dtype is torch.bool else preds_scores.dtype
    sort_ind = torch.argsort(preds_scores.to(dtype), descending=True)
    tps = tps[sort_ind, :]
    fps = fps[sort_ind, :]
    preds_scores = preds_scores[sort_ind].contiguous()

    # Rolling sum over the predictions
    rolling_tps = torch.cumsum(tps, axis=0, dtype=torch.float)
    rolling_fps = torch.cumsum(fps, axis=0, dtype=torch.float)

    rolling_recalls = rolling_tps / n_targets
    rolling_precisions = rolling_tps / (rolling_tps + rolling_fps + torch.finfo(torch.float64).eps)

    # Reversed cummax to only have decreasing values
    rolling_precisions = rolling_precisions.flip(0).cummax(0).values.flip(0)

    # ==================
    # RECALL & PRECISION

    # We want the rolling precision/recall at index i so that: preds_scores[i-1] >= score_threshold > preds_scores[i]
    # Note: torch.searchsorted works on increasing sequence and preds_scores is decreasing, so we work with "-"
    lowest_score_above_threshold = torch.searchsorted(-preds_scores, -score_threshold, right=False)

    if lowest_score_above_threshold == 0:  # Here score_threshold > preds_scores[0], so no pred is above the threshold
        recall = torch.zeros(nb_iou_thrs, device=device)
        precision = torch.zeros(nb_iou_thrs, device=device)  # the precision is not really defined when no pred but we need to give it a value
    else:
        recall = rolling_recalls[lowest_score_above_threshold - 1]
        precision = rolling_precisions[lowest_score_above_threshold - 1]

    # ==================
    # AVERAGE PRECISION

    # shape = (nb_iou_thrs, n_recall_thresholds)
    recall_thresholds = recall_thresholds.view(1, -1).repeat(nb_iou_thrs, 1)

    # We want the index i so that: rolling_recalls[i-1] < recall_thresholds[k] <= rolling_recalls[i]
    # Note:  when recall_thresholds[k] > max(rolling_recalls), i = len(rolling_recalls)
    # Note2: we work with transpose (.T) to apply torch.searchsorted on first dim instead of the last one
    recall_threshold_idx = torch.searchsorted(rolling_recalls.T.contiguous(), recall_thresholds, right=False).T

    # When recall_thresholds[k] > max(rolling_recalls), rolling_precisions[i] is not defined, and we want precision = 0
    rolling_precisions = torch.cat((rolling_precisions, torch.zeros(1, nb_iou_thrs, device=device)), dim=0)

    # shape = (n_recall_thresholds, nb_iou_thrs)
    sampled_precision_points = torch.gather(input=rolling_precisions, index=recall_threshold_idx, dim=0)

    # Average over the recall_thresholds
    ap = sampled_precision_points.mean(0)

    return ap, precision, recall

compute_img_detection_matching(preds, targets, crowd_targets, height, width, iou_thresholds, device, denormalize_targets, top_k=100, return_on_cpu=True)

Match predictions (NMS output) and the targets (ground truth) with respect to IoU and confidence score for a given image.

Parameters:

Name Type Description Default
preds torch.Tensor

Tensor of shape (num_img_predictions, 6) format: (x1, y1, x2, y2, confidence, class_label) where x1,y1,x2,y2 are according to image size

required
targets torch.Tensor

targets for this image of shape (num_img_targets, 6) format: (label, cx, cy, w, h, label) where cx,cy,w,h

required
height int

dimensions of the image

required
width int

dimensions of the image

required
iou_thresholds torch.Tensor

Threshold to compute the mAP

required
device str required
crowd_targets torch.Tensor

crowd targets for all images of shape (total_num_crowd_targets, 6) format: (index, x, y, w, h, label) where x,y,w,h are in range [0,1]

required
top_k int

Number of predictions to keep per class, ordered by confidence score

100
denormalize_targets bool

If True, denormalize the targets and crowd_targets

required
return_on_cpu bool

If True, the output will be returned on "CPU", otherwise it will be returned on "device"

True

Returns:

Type Description
Tuple

:preds_matched: Tensor of shape (num_img_predictions, n_iou_thresholds) True when prediction (i) is matched with a target with respect to the (j)th IoU threshold :preds_to_ignore: Tensor of shape (num_img_predictions, n_iou_thresholds) True when prediction (i) is matched with a crowd target with respect to the (j)th IoU threshold :preds_scores: Tensor of shape (num_img_predictions), confidence score for every prediction :preds_cls: Tensor of shape (num_img_predictions), predicted class for every prediction :targets_cls: Tensor of shape (num_img_targets), ground truth class for every target

Source code in training/utils/detection_utils.py
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
def compute_img_detection_matching(
    preds: torch.Tensor,
    targets: torch.Tensor,
    crowd_targets: torch.Tensor,
    height: int,
    width: int,
    iou_thresholds: torch.Tensor,
    device: str,
    denormalize_targets: bool,
    top_k: int = 100,
    return_on_cpu: bool = True,
) -> Tuple:
    """
    Match predictions (NMS output) and the targets (ground truth) with respect to IoU and confidence score
    for a given image.
    :param preds:           Tensor of shape (num_img_predictions, 6)
                            format:     (x1, y1, x2, y2, confidence, class_label) where x1,y1,x2,y2 are according to image size
    :param targets:         targets for this image of shape (num_img_targets, 6)
                            format:     (label, cx, cy, w, h, label) where cx,cy,w,h
    :param height:          dimensions of the image
    :param width:           dimensions of the image
    :param iou_thresholds:  Threshold to compute the mAP
    :param device:
    :param crowd_targets:   crowd targets for all images of shape (total_num_crowd_targets, 6)
                            format:     (index, x, y, w, h, label) where x,y,w,h are in range [0,1]
    :param top_k:           Number of predictions to keep per class, ordered by confidence score
    :param device:          Device
    :param denormalize_targets: If True, denormalize the targets and crowd_targets
    :param return_on_cpu:   If True, the output will be returned on "CPU", otherwise it will be returned on "device"

    :return:
        :preds_matched:     Tensor of shape (num_img_predictions, n_iou_thresholds)
                                True when prediction (i) is matched with a target with respect to the (j)th IoU threshold
        :preds_to_ignore:   Tensor of shape (num_img_predictions, n_iou_thresholds)
                                True when prediction (i) is matched with a crowd target with respect to the (j)th IoU threshold
        :preds_scores:      Tensor of shape (num_img_predictions), confidence score for every prediction
        :preds_cls:         Tensor of shape (num_img_predictions), predicted class for every prediction
        :targets_cls:       Tensor of shape (num_img_targets), ground truth class for every target
    """
    num_iou_thresholds = len(iou_thresholds)

    if preds is None or len(preds) == 0:
        if return_on_cpu:
            device = "cpu"
        preds_matched = torch.zeros((0, num_iou_thresholds), dtype=torch.bool, device=device)
        preds_to_ignore = torch.zeros((0, num_iou_thresholds), dtype=torch.bool, device=device)
        preds_scores = torch.tensor([], dtype=torch.float32, device=device)
        preds_cls = torch.tensor([], dtype=torch.float32, device=device)
        targets_cls = targets[:, 0].to(device=device)
        return preds_matched, preds_to_ignore, preds_scores, preds_cls, targets_cls

    preds_matched = torch.zeros(len(preds), num_iou_thresholds, dtype=torch.bool, device=device)
    targets_matched = torch.zeros(len(targets), num_iou_thresholds, dtype=torch.bool, device=device)
    preds_to_ignore = torch.zeros(len(preds), num_iou_thresholds, dtype=torch.bool, device=device)

    preds_cls, preds_box, preds_scores = preds[:, -1], preds[:, 0:4], preds[:, 4]
    targets_cls, targets_box = targets[:, 0], targets[:, 1:5]
    crowd_targets_cls, crowd_target_box = crowd_targets[:, 0], crowd_targets[:, 1:5]

    # Ignore all but the predictions that were top_k for their class
    preds_idx_to_use = get_top_k_idx_per_cls(preds_scores, preds_cls, top_k)
    preds_to_ignore[:, :] = True
    preds_to_ignore[preds_idx_to_use] = False

    if len(targets) > 0 or len(crowd_targets) > 0:

        # CHANGE bboxes TO FIT THE IMAGE SIZE
        change_bbox_bounds_for_image_size(preds, (height, width))

        targets_box = cxcywh2xyxy(targets_box)
        crowd_target_box = cxcywh2xyxy(crowd_target_box)

        if denormalize_targets:
            targets_box[:, [0, 2]] *= width
            targets_box[:, [1, 3]] *= height
            crowd_target_box[:, [0, 2]] *= width
            crowd_target_box[:, [1, 3]] *= height

    if len(targets) > 0:

        # shape = (n_preds x n_targets)
        iou = box_iou(preds_box[preds_idx_to_use], targets_box)

        # Fill IoU values at index (i, j) with 0 when the prediction (i) and target(j) are of different class
        # Filling with 0 is equivalent to ignore these values since with want IoU > iou_threshold > 0
        cls_mismatch = preds_cls[preds_idx_to_use].view(-1, 1) != targets_cls.view(1, -1)
        iou[cls_mismatch] = 0

        # The matching priority is first detection confidence and then IoU value.
        # The detection is already sorted by confidence in NMS, so here for each prediction we order the targets by iou.
        sorted_iou, target_sorted = iou.sort(descending=True, stable=True)

        # Only iterate over IoU values higher than min threshold to speed up the process
        for pred_selected_i, target_sorted_i in (sorted_iou > iou_thresholds[0]).nonzero(as_tuple=False):

            # pred_selected_i and target_sorted_i are relative to filters/sorting, so we extract their absolute indexes
            pred_i = preds_idx_to_use[pred_selected_i]
            target_i = target_sorted[pred_selected_i, target_sorted_i]

            # Vector[j], True when IoU(pred_i, target_i) is above the (j)th threshold
            is_iou_above_threshold = sorted_iou[pred_selected_i, target_sorted_i] > iou_thresholds

            # Vector[j], True when both pred_i and target_i are not matched yet for the (j)th threshold
            are_candidates_free = torch.logical_and(~preds_matched[pred_i, :], ~targets_matched[target_i, :])

            # Vector[j], True when (pred_i, target_i) can be matched for the (j)th threshold
            are_candidates_good = torch.logical_and(is_iou_above_threshold, are_candidates_free)

            # For every threshold (j) where target_i and pred_i can be matched together ( are_candidates_good[j]==True )
            # fill the matching placeholders with True
            targets_matched[target_i, are_candidates_good] = True
            preds_matched[pred_i, are_candidates_good] = True

            # When all the targets are matched with a prediction for every IoU Threshold, stop.
            if targets_matched.all():
                break

    # Crowd targets can be matched with many predictions.
    # Therefore, for every prediction we just need to check if it has IoA large enough with any crowd target.
    if len(crowd_targets) > 0:

        # shape = (n_preds_to_use x n_crowd_targets)
        ioa = crowd_ioa(preds_box[preds_idx_to_use], crowd_target_box)

        # Fill IoA values at index (i, j) with 0 when the prediction (i) and target(j) are of different class
        # Filling with 0 is equivalent to ignore these values since with want IoA > threshold > 0
        cls_mismatch = preds_cls[preds_idx_to_use].view(-1, 1) != crowd_targets_cls.view(1, -1)
        ioa[cls_mismatch] = 0

        # For each prediction, we keep it's highest score with any crowd target (of same class)
        # shape = (n_preds_to_use)
        best_ioa, _ = ioa.max(1)

        # If a prediction has IoA higher than threshold (with any target of same class), then there is a match
        # shape = (n_preds_to_use x iou_thresholds)
        is_matching_with_crowd = best_ioa.view(-1, 1) > iou_thresholds.view(1, -1)

        preds_to_ignore[preds_idx_to_use] = torch.logical_or(preds_to_ignore[preds_idx_to_use], is_matching_with_crowd)

    if return_on_cpu:
        preds_matched = preds_matched.to("cpu")
        preds_to_ignore = preds_to_ignore.to("cpu")
        preds_scores = preds_scores.to("cpu")
        preds_cls = preds_cls.to("cpu")
        targets_cls = targets_cls.to("cpu")

    return preds_matched, preds_to_ignore, preds_scores, preds_cls, targets_cls

convert_cxcywh_bbox_to_xyxy(input_bbox)

Converts bounding box format from [cx, cy, w, h] to [x1, y1, x2, y2] :param input_bbox: input bbox either 2-dimensional (for all boxes of a single image) or 3-dimensional (for boxes of a batch of images) :return: Converted bbox in same dimensions as the original

Source code in training/utils/detection_utils.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def convert_cxcywh_bbox_to_xyxy(input_bbox: torch.Tensor):
    """
    Converts bounding box format from [cx, cy, w, h] to [x1, y1, x2, y2]
        :param input_bbox:  input bbox either 2-dimensional (for all boxes of a single image) or 3-dimensional (for
                            boxes of a batch of images)
        :return:            Converted bbox in same dimensions as the original
    """
    need_squeeze = False
    # the input is always processed as a batch. in case it not a batch, it is unsqueezed, process and than squeeze back.
    if input_bbox.dim() < 3:
        need_squeeze = True
        input_bbox = input_bbox.unsqueeze(0)

    converted_bbox = torch.zeros_like(input_bbox) if isinstance(input_bbox, torch.Tensor) else np.zeros_like(input_bbox)
    converted_bbox[:, :, 0] = input_bbox[:, :, 0] - input_bbox[:, :, 2] / 2
    converted_bbox[:, :, 1] = input_bbox[:, :, 1] - input_bbox[:, :, 3] / 2
    converted_bbox[:, :, 2] = input_bbox[:, :, 0] + input_bbox[:, :, 2] / 2
    converted_bbox[:, :, 3] = input_bbox[:, :, 1] + input_bbox[:, :, 3] / 2

    # squeeze back if needed
    if need_squeeze:
        converted_bbox = converted_bbox[0]

    return converted_bbox

crowd_ioa(det_box, crowd_box)

Return intersection-over-detection_area of boxes, used for crowd ground truths. Both sets of boxes are expected to be in (x1, y1, x2, y2) format.

Parameters:

Name Type Description Default
det_box torch.Tensor

Tensor of shape [N, 4]

required
crowd_box torch.Tensor

Tensor of shape [M, 4]

required

Returns:

Type Description
torch.Tensor

crowd_ioa, Tensor of shape [N, M]: the NxM matrix containing the pairwise IoA values for every element in det_box and crowd_box

Source code in training/utils/detection_utils.py
801
802
803
804
805
806
807
808
809
810
811
812
813
814
def crowd_ioa(det_box: torch.Tensor, crowd_box: torch.Tensor) -> torch.Tensor:
    """
    Return intersection-over-detection_area of boxes, used for crowd ground truths.
    Both sets of boxes are expected to be in (x1, y1, x2, y2) format.

    :param det_box:     Tensor of shape [N, 4]
    :param crowd_box:   Tensor of shape [M, 4]
    :return: crowd_ioa, Tensor of shape [N, M]: the NxM matrix containing the pairwise IoA values for every element in det_box and crowd_box
    """
    det_area = compute_box_area(det_box.T)

    # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2)
    inter = (torch.min(det_box[:, None, 2:], crowd_box[:, 2:]) - torch.max(det_box[:, None, :2], crowd_box[:, :2])).clamp(0).prod(2)
    return inter / det_area[:, None]  # crowd_ioa = inter / det_area

cxcywh2xyxy(bboxes)

Transforms bboxes from centerized xy wh format to xyxy format

Parameters:

Name Type Description Default
bboxes

array, shaped (nboxes, 4)

required

Returns:

Type Description

modified bboxes

Source code in training/utils/detection_utils.py
597
598
599
600
601
602
603
604
605
606
607
def cxcywh2xyxy(bboxes):
    """
    Transforms bboxes from centerized xy wh format to xyxy format
    :param bboxes: array, shaped (nboxes, 4)
    :return: modified bboxes
    """
    bboxes[:, 1] = bboxes[:, 1] - bboxes[:, 3] * 0.5
    bboxes[:, 0] = bboxes[:, 0] - bboxes[:, 2] * 0.5
    bboxes[:, 3] = bboxes[:, 3] + bboxes[:, 1]
    bboxes[:, 2] = bboxes[:, 2] + bboxes[:, 0]
    return bboxes

get_cls_posx_in_target(target_format)

Get the label of a given target

Parameters:

Name Type Description Default
target_format DetectionTargetsFormat

Representation of the target (ex: LABEL_XYXY)

required

Returns:

Type Description
int

Position of the class id in a bbox ex: 0 if bbox of format label_xyxy | -1 if bbox of format xyxy_label

Source code in training/utils/detection_utils.py
43
44
45
46
47
48
49
50
51
52
53
54
55
def get_cls_posx_in_target(target_format: DetectionTargetsFormat) -> int:
    """Get the label of a given target
    :param target_format:   Representation of the target (ex: LABEL_XYXY)
    :return:                Position of the class id in a bbox
                                ex: 0 if bbox of format label_xyxy | -1 if bbox of format xyxy_label
    """
    format_split = target_format.value.split("_")
    if format_split[0] == "LABEL":
        return 0
    elif format_split[-1] == "LABEL":
        return -1
    else:
        raise NotImplementedError(f"No implementation to find index of LABEL in {target_format.value}")

get_mosaic_coordinate(mosaic_index, xc, yc, w, h, input_h, input_w)

Returns the mosaic coordinates of final mosaic image according to mosaic image index.

Parameters:

Name Type Description Default
mosaic_index

(int) mosaic image index

required
xc

(int) center x coordinate of the entire mosaic grid.

required
yc

(int) center y coordinate of the entire mosaic grid.

required
w

(int) width of bbox

required
h

(int) height of bbox

required
input_h

(int) image input height (should be 1/2 of the final mosaic output image height).

required
input_w

(int) image input width (should be 1/2 of the final mosaic output image width).

required

Returns:

Type Description

(x1, y1, x2, y2), (x1s, y1s, x2s, y2s) where (x1, y1, x2, y2) are the coordinates in the final mosaic output image, and (x1s, y1s, x2s, y2s) are the coordinates in the placed image.

Source code in training/utils/detection_utils.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def get_mosaic_coordinate(mosaic_index, xc, yc, w, h, input_h, input_w):
    """
    Returns the mosaic coordinates of final mosaic image according to mosaic image index.

    :param mosaic_index: (int) mosaic image index
    :param xc: (int) center x coordinate of the entire mosaic grid.
    :param yc: (int) center y coordinate of the entire mosaic grid.
    :param w: (int) width of bbox
    :param h: (int) height of bbox
    :param input_h: (int) image input height (should be 1/2 of the final mosaic output image height).
    :param input_w: (int) image input width (should be 1/2 of the final mosaic output image width).
    :return: (x1, y1, x2, y2), (x1s, y1s, x2s, y2s) where (x1, y1, x2, y2) are the coordinates in the final mosaic
        output image, and (x1s, y1s, x2s, y2s) are the coordinates in the placed image.
    """
    # index0 to top left part of image
    if mosaic_index == 0:
        x1, y1, x2, y2 = max(xc - w, 0), max(yc - h, 0), xc, yc
        small_coord = w - (x2 - x1), h - (y2 - y1), w, h
    # index1 to top right part of image
    elif mosaic_index == 1:
        x1, y1, x2, y2 = xc, max(yc - h, 0), min(xc + w, input_w * 2), yc
        small_coord = 0, h - (y2 - y1), min(w, x2 - x1), h
    # index2 to bottom left part of image
    elif mosaic_index == 2:
        x1, y1, x2, y2 = max(xc - w, 0), yc, xc, min(input_h * 2, yc + h)
        small_coord = w - (x2 - x1), 0, w, min(y2 - y1, h)
    # index2 to bottom right part of image
    elif mosaic_index == 3:
        x1, y1, x2, y2 = xc, yc, min(xc + w, input_w * 2), min(input_h * 2, yc + h)  # noqa
        small_coord = 0, 0, min(w, x2 - x1), min(y2 - y1, h)
    return (x1, y1, x2, y2), small_coord

get_top_k_idx_per_cls(preds_scores, preds_cls, top_k)

Get the indexes of all the top k predictions for every class

Parameters:

Name Type Description Default
preds_scores torch.Tensor

The confidence scores, vector of shape (n_pred)

required
preds_cls torch.Tensor

The predicted class, vector of shape (n_pred)

required
top_k int

Number of predictions to keep per class, ordered by confidence score

required

Returns:

Type Description

Indexes of the top k predictions. length <= (k * n_unique_class)

Source code in training/utils/detection_utils.py
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
def get_top_k_idx_per_cls(preds_scores: torch.Tensor, preds_cls: torch.Tensor, top_k: int):
    """Get the indexes of all the top k predictions for every class

    :param preds_scores:   The confidence scores, vector of shape (n_pred)
    :param preds_cls:      The predicted class, vector of shape (n_pred)
    :param top_k:          Number of predictions to keep per class, ordered by confidence score

    :return top_k_idx:     Indexes of the top k predictions. length <= (k * n_unique_class)
    """
    n_unique_cls = torch.max(preds_cls)
    mask = preds_cls.view(-1, 1) == torch.arange(n_unique_cls + 1, device=preds_scores.device).view(1, -1)
    preds_scores_per_cls = preds_scores.view(-1, 1) * mask

    sorted_scores_per_cls, sorting_idx = preds_scores_per_cls.sort(0, descending=True)
    idx_with_satisfying_scores = sorted_scores_per_cls[:top_k, :].nonzero(as_tuple=False)
    top_k_idx = sorting_idx[idx_with_satisfying_scores.split(1, dim=1)]
    return top_k_idx.view(-1)

matrix_non_max_suppression(pred, conf_thres=0.1, kernel='gaussian', sigma=3.0, max_num_of_detections=500)

Performs Matrix Non-Maximum Suppression (NMS) on inference results https://arxiv.org/pdf/1912.04488.pdf

Parameters:

Name Type Description Default
pred

Raw model prediction (in test mode) - a Tensor of shape [batch, num_predictions, 85] where each item format is (x, y, w, h, object_conf, class_conf, ... 80 classes score ...)

required
conf_thres float

Threshold under which prediction are discarded

0.1
kernel str

Type of kernel to use ['gaussian', 'linear']

'gaussian'
sigma float

Sigma for the gaussian kernel

3.0
max_num_of_detections int

Maximum number of boxes to output

500

Returns:

Type Description
List[torch.Tensor]

Detections list with shape (x1, y1, x2, y2, object_conf, class_conf, class)

Source code in training/utils/detection_utils.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def matrix_non_max_suppression(
    pred,
    conf_thres: float = 0.1,
    kernel: str = "gaussian",
    sigma: float = 3.0,
    max_num_of_detections: int = 500,
) -> List[torch.Tensor]:
    """Performs Matrix Non-Maximum Suppression (NMS) on inference results https://arxiv.org/pdf/1912.04488.pdf

    :param pred:        Raw model prediction (in test mode) - a Tensor of shape [batch, num_predictions, 85]
                        where each item format is (x, y, w, h, object_conf, class_conf, ... 80 classes score ...)
    :param conf_thres:  Threshold under which prediction are discarded
    :param kernel:      Type of kernel to use ['gaussian', 'linear']
    :param sigma:       Sigma for the gaussian kernel
    :param max_num_of_detections: Maximum number of boxes to output

    :return: Detections list with shape (x1, y1, x2, y2, object_conf, class_conf, class)
    """
    # MULTIPLY CONF BY CLASS CONF TO GET COMBINED CONFIDENCE
    class_conf, class_pred = pred[:, :, 5:].max(2)
    pred[:, :, 4] *= class_conf

    # BOX (CENTER X, CENTER Y, WIDTH, HEIGHT) TO (X1, Y1, X2, Y2)
    pred[:, :, :4] = convert_cxcywh_bbox_to_xyxy(pred[:, :, :4])

    # DETECTIONS ORDERED AS (x1y1x2y2, obj_conf, class_conf, class_pred)
    pred = torch.cat((pred[:, :, :5], class_pred.unsqueeze(2)), 2)

    # SORT DETECTIONS BY DECREASING CONFIDENCE SCORES
    sort_ind = (-pred[:, :, 4]).argsort()
    pred = torch.stack([pred[i, sort_ind[i]] for i in range(pred.shape[0])])[:, 0:max_num_of_detections]

    ious = calc_bbox_iou_matrix(pred)

    ious = ious.triu(1)

    # CREATE A LABELS MASK, WE WANT ONLY BOXES WITH THE SAME LABEL TO AFFECT EACH OTHER
    labels = pred[:, :, 5:]
    labeles_matrix = (labels == labels.transpose(2, 1)).float().triu(1)

    ious *= labeles_matrix
    ious_cmax, _ = ious.max(1)
    ious_cmax = ious_cmax.unsqueeze(2).repeat(1, 1, max_num_of_detections)

    if kernel == "gaussian":
        decay_matrix = torch.exp(-1 * sigma * (ious**2))
        compensate_matrix = torch.exp(-1 * sigma * (ious_cmax**2))
        decay, _ = (decay_matrix / compensate_matrix).min(dim=1)
    else:
        decay = (1 - ious) / (1 - ious_cmax)
        decay, _ = decay.min(dim=1)

    pred[:, :, 4] *= decay

    output = [pred[i, pred[i, :, 4] > conf_thres] for i in range(pred.shape[0])]

    return output

non_max_suppression(prediction, conf_thres=0.1, iou_thres=0.6, multi_label_per_box=True, with_confidence=False)

Performs Non-Maximum Suppression (NMS) on inference results :param prediction: raw model prediction. Should be a list of Tensors of shape (cx, cy, w, h, confidence, cls0, cls1, ...) :param conf_thres: below the confidence threshold - prediction are discarded :param iou_thres: IoU threshold for the nms algorithm :param multi_label_per_box: whether to use re-use each box with all possible labels (instead of the maximum confidence all confidences above threshold will be sent to NMS); by default is set to True :param with_confidence: whether to multiply objectness score with class score. usually valid for Yolo models only. :return: detections with shape nx6 (x1, y1, x2, y2, object_conf, class_conf, class)

Source code in training/utils/detection_utils.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def non_max_suppression(prediction, conf_thres=0.1, iou_thres=0.6, multi_label_per_box: bool = True, with_confidence: bool = False):
    """
    Performs Non-Maximum Suppression (NMS) on inference results
        :param prediction: raw model prediction. Should be a list of Tensors of shape (cx, cy, w, h, confidence, cls0, cls1, ...)
        :param conf_thres: below the confidence threshold - prediction are discarded
        :param iou_thres: IoU threshold for the nms algorithm
        :param multi_label_per_box: whether to use re-use each box with all possible labels
                                    (instead of the maximum confidence all confidences above threshold
                                    will be sent to NMS); by default is set to True
        :param with_confidence: whether to multiply objectness score with class score.
                                usually valid for Yolo models only.
        :return: detections with shape nx6 (x1, y1, x2, y2, object_conf, class_conf, class)
    """
    candidates_above_thres = prediction[..., 4] > conf_thres  # filter by confidence
    output = [None] * prediction.shape[0]

    for image_idx, pred in enumerate(prediction):

        pred = pred[candidates_above_thres[image_idx]]  # confident

        if not pred.shape[0]:  # If none remain process next image
            continue

        if with_confidence:
            pred[:, 5:] *= pred[:, 4:5]  # multiply objectness score with class score

        box = convert_cxcywh_bbox_to_xyxy(pred[:, :4])  # cxcywh to xyxy

        # Detections matrix nx6 (xyxy, conf, cls)
        if multi_label_per_box:  # try for all good confidence classes
            i, j = (pred[:, 5:] > conf_thres).nonzero(as_tuple=False).T
            pred = torch.cat((box[i], pred[i, j + 5, None], j[:, None].float()), 1)

        else:  # best class only
            conf, j = pred[:, 5:].max(1, keepdim=True)
            pred = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres]

        if not pred.shape[0]:  # If none remain process next image
            continue

        # Apply torch batched NMS algorithm
        boxes, scores, cls_idx = pred[:, :4], pred[:, 4], pred[:, 5]
        idx_to_keep = torchvision.ops.boxes.batched_nms(boxes, scores, cls_idx, iou_thres)
        output[image_idx] = pred[idx_to_keep]

    return output

undo_image_preprocessing(im_tensor)

Parameters:

Name Type Description Default
im_tensor torch.Tensor

images in a batch after preprocessing for inference, RGB, (B, C, H, W)

required

Returns:

Type Description
np.ndarray

images in a batch in cv2 format, BGR, (B, H, W, C)

Source code in training/utils/detection_utils.py
352
353
354
355
356
357
358
359
360
def undo_image_preprocessing(im_tensor: torch.Tensor) -> np.ndarray:
    """
    :param im_tensor: images in a batch after preprocessing for inference, RGB, (B, C, H, W)
    :return:          images in a batch in cv2 format, BGR, (B, H, W, C)
    """
    im_np = im_tensor.cpu().numpy()
    im_np = im_np[:, ::-1, :, :].transpose(0, 2, 3, 1)
    im_np *= 255.0
    return np.ascontiguousarray(im_np, dtype=np.uint8)

xyxy2cxcywh(bboxes)

Transforms bboxes from xyxy format to centerized xy wh format

Parameters:

Name Type Description Default
bboxes

array, shaped (nboxes, 4)

required

Returns:

Type Description

modified bboxes

Source code in training/utils/detection_utils.py
584
585
586
587
588
589
590
591
592
593
594
def xyxy2cxcywh(bboxes):
    """
    Transforms bboxes from xyxy format to centerized xy wh format
    :param bboxes: array, shaped (nboxes, 4)
    :return: modified bboxes
    """
    bboxes[:, 2] = bboxes[:, 2] - bboxes[:, 0]
    bboxes[:, 3] = bboxes[:, 3] - bboxes[:, 1]
    bboxes[:, 0] = bboxes[:, 0] + bboxes[:, 2] * 0.5
    bboxes[:, 1] = bboxes[:, 1] + bboxes[:, 3] * 0.5
    return bboxes

DDPNotSetupException

Bases: Exception

Exception raised when DDP setup is required but was not done

Source code in training/utils/distributed_training_utils.py
403
404
405
406
407
408
409
410
411
412
413
414
class DDPNotSetupException(Exception):
    """Exception raised when DDP setup is required but was not done"""

    def __init__(self):
        self.message = (
            "Your environment was not setup correctly for DDP.\n"
            "Please run at the beginning of your script:\n"
            ">>> from super_gradients.training.utils.distributed_training_utils import setup_device'\n"
            ">>> from super_gradients.common.data_types.enum import MultiGPUMode\n"
            ">>> setup_device(multi_gpu=MultiGPUMode.DISTRIBUTED_DATA_PARALLEL, num_gpus=...)"
        )
        super().__init__(self.message)

compute_precise_bn_stats(model, loader, precise_bn_batch_size, num_gpus)

Parameters:

Name Type Description Default
model nn.Module

The model being trained (ie: Trainer.net)

required
loader torch.utils.data.DataLoader

Training dataloader (ie: Trainer.train_loader)

required
precise_bn_batch_size int

The effective batch size we want to calculate the batchnorm on. For example, if we are training a model on 8 gpus, with a batch of 128 on each gpu, a good rule of thumb would be to give it 8192 (ie: effective_batch_size * num_gpus = batch_per_gpu * num_gpus * num_gpus). If precise_bn_batch_size is not provided in the training_params, the latter heuristic will be taken. param num_gpus: The number of gpus we are training on

required
Source code in training/utils/distributed_training_utils.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@torch.no_grad()
def compute_precise_bn_stats(model: nn.Module, loader: torch.utils.data.DataLoader, precise_bn_batch_size: int, num_gpus: int):
    """
    :param model:                   The model being trained (ie: Trainer.net)
    :param loader:                  Training dataloader (ie: Trainer.train_loader)
    :param precise_bn_batch_size:   The effective batch size we want to calculate the batchnorm on. For example, if we are training a model
                                    on 8 gpus, with a batch of 128 on each gpu, a good rule of thumb would be to give it 8192
                                    (ie: effective_batch_size * num_gpus = batch_per_gpu * num_gpus * num_gpus).
                                    If precise_bn_batch_size is not provided in the training_params, the latter heuristic
                                    will be taken.
    param num_gpus:                 The number of gpus we are training on
    """

    # Compute the number of minibatches to use
    num_iter = int(precise_bn_batch_size / (loader.batch_size * num_gpus)) if precise_bn_batch_size else num_gpus
    num_iter = min(num_iter, len(loader))

    # Retrieve the BN layers
    bns = [m for m in model.modules() if isinstance(m, torch.nn.BatchNorm2d)]

    # Initialize BN stats storage for computing mean(mean(batch)) and mean(var(batch))
    running_means = [torch.zeros_like(bn.running_mean) for bn in bns]
    running_vars = [torch.zeros_like(bn.running_var) for bn in bns]

    # Remember momentum values
    momentums = [bn.momentum for bn in bns]

    # Set momentum to 1.0 to compute BN stats that only reflect the current batch
    for bn in bns:
        bn.momentum = 1.0

    # Average the BN stats for each BN layer over the batches
    for inputs, _labels in itertools.islice(loader, num_iter):
        model(inputs.cuda())
        for i, bn in enumerate(bns):
            running_means[i] += bn.running_mean / num_iter
            running_vars[i] += bn.running_var / num_iter

    # Sync BN stats across GPUs (no reduction if 1 GPU used)
    running_means = scaled_all_reduce(running_means, num_gpus=num_gpus)
    running_vars = scaled_all_reduce(running_vars, num_gpus=num_gpus)

    # Set BN stats and restore original momentum values
    for i, bn in enumerate(bns):
        bn.running_mean = running_means[i]
        bn.running_var = running_vars[i]
        bn.momentum = momentums[i]

distributed_all_reduce_tensor_average(tensor, n)

This method performs a reduce operation on multiple nodes running distributed training It first sums all of the results and then divides the summation

Parameters:

Name Type Description Default
tensor

The tensor to perform the reduce operation for

required
n

Number of nodes

required

Returns:

Type Description

Averaged tensor from all of the nodes

Source code in training/utils/distributed_training_utils.py
31
32
33
34
35
36
37
38
39
40
41
42
def distributed_all_reduce_tensor_average(tensor, n):
    """
    This method performs a reduce operation on multiple nodes running distributed training
    It first sums all of the results and then divides the summation
    :param tensor:  The tensor to perform the reduce operation for
    :param n:  Number of nodes
    :return:   Averaged tensor from all of the nodes
    """
    rt = tensor.clone()
    torch.distributed.all_reduce(rt, op=torch.distributed.ReduceOp.SUM)
    rt /= n
    return rt

get_gpu_mem_utilization()

GPU memory managed by the caching allocator in bytes for a given device.

Source code in training/utils/distributed_training_utils.py
393
394
395
396
397
398
399
400
def get_gpu_mem_utilization():
    """GPU memory managed by the caching allocator in bytes for a given device."""

    # Workaround to work on any torch version
    if hasattr(torch.cuda, "memory_reserved"):
        return torch.cuda.memory_reserved()
    else:
        return torch.cuda.memory_cached()

get_local_rank()

Returns the local rank if running in DDP, and 0 otherwise

Returns:

Type Description

local rank

Source code in training/utils/distributed_training_utils.py
146
147
148
149
150
151
def get_local_rank():
    """
    Returns the local rank if running in DDP, and 0 otherwise
    :return: local rank
    """
    return dist.get_rank() if dist.is_initialized() else 0

get_world_size()

Returns the world size if running in DDP, and 1 otherwise

Returns:

Type Description
int

world size

Source code in training/utils/distributed_training_utils.py
162
163
164
165
166
167
168
169
170
171
def get_world_size() -> int:
    """
    Returns the world size if running in DDP, and 1 otherwise
    :return: world size
    """
    if not dist.is_available():
        return 1
    if not dist.is_initialized():
        return 1
    return dist.get_world_size()

initialize_ddp()

Initialize Distributed Data Parallel

Important note: (1) in distributed training it is customary to specify learning rates and batch sizes per GPU. Whatever learning rate and schedule you specify will be applied to the each GPU individually. Since gradients are passed and summed (reduced) from all to all GPUs, the effective batch size is the batch you specify times the number of GPUs. In the literature there are several "best practices" to set learning rates and schedules for large batch sizes.

Source code in training/utils/distributed_training_utils.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def initialize_ddp():
    """
    Initialize Distributed Data Parallel

    Important note: (1) in distributed training it is customary to specify learning rates and batch sizes per GPU.
    Whatever learning rate and schedule you specify will be applied to the each GPU individually.
    Since gradients are passed and summed (reduced) from all to all GPUs, the effective batch size is the
    batch you specify times the number of GPUs. In the literature there are several "best practices" to set
    learning rates and schedules for large batch sizes.
    """

    if device_config.assigned_rank > 0:
        mute_current_process()

    logger.info("Distributed training starting...")
    if not torch.distributed.is_initialized():
        backend = "gloo" if os.name == "nt" else "nccl"
        torch.distributed.init_process_group(backend=backend, init_method="env://")
    torch.cuda.set_device(device_config.assigned_rank)

    if torch.distributed.get_rank() == 0:
        logger.info(f"Training in distributed mode... with {str(torch.distributed.get_world_size())} GPUs")
    device_config.device = "cuda:%d" % device_config.assigned_rank

reduce_results_tuple_for_ddp(validation_results_tuple, device)

Gather all validation tuples from the various devices and average them

Source code in training/utils/distributed_training_utils.py
45
46
47
48
49
50
51
52
53
54
55
def reduce_results_tuple_for_ddp(validation_results_tuple, device):
    """Gather all validation tuples from the various devices and average them"""
    validation_results_list = list(validation_results_tuple)
    for i, validation_result in enumerate(validation_results_list):
        if torch.is_tensor(validation_result):
            validation_result = validation_result.clone().detach()
        else:
            validation_result = torch.tensor(validation_result)
        validation_results_list[i] = distributed_all_reduce_tensor_average(tensor=validation_result.to(device), n=torch.distributed.get_world_size())
    validation_results_tuple = tuple(validation_results_list)
    return validation_results_tuple

restart_script_with_ddp(num_gpus=None)

Launch the same script as the one that was launched (i.e. the command used to start the current process is re-used) but on subprocesses (i.e. with DDP).

Parameters:

Name Type Description Default
num_gpus int

How many gpu's you want to run the script on. If not specified, every available device will be used.

None
Source code in training/utils/distributed_training_utils.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
@record
def restart_script_with_ddp(num_gpus: int = None):
    """Launch the same script as the one that was launched (i.e. the command used to start the current process is re-used) but on subprocesses (i.e. with DDP).

    :param num_gpus: How many gpu's you want to run the script on. If not specified, every available device will be used.
    """
    ddp_port = find_free_port()

    # Get the value fom recipe if specified, otherwise take all available devices.
    num_gpus = num_gpus if num_gpus is not None else torch.cuda.device_count()
    if num_gpus > torch.cuda.device_count():
        raise ValueError(f"You specified num_gpus={num_gpus} but only {torch.cuda.device_count()} GPU's are available")

    logger.info(
        "Launching DDP with:\n"
        f"   - ddp_port = {ddp_port}\n"
        f"   - num_gpus = {num_gpus}/{torch.cuda.device_count()} available\n"
        "-------------------------------------\n"
    )

    config = LaunchConfig(
        nproc_per_node=num_gpus,
        min_nodes=1,
        max_nodes=1,
        run_id="sg_initiated",
        role="default",
        rdzv_endpoint=f"127.0.0.1:{ddp_port}",
        rdzv_backend="static",
        rdzv_configs={"rank": 0, "timeout": 900},
        rdzv_timeout=-1,
        max_restarts=0,
        monitor_interval=5,
        start_method="spawn",
        log_dir=None,
        redirects=Std.NONE,
        tee=Std.NONE,
        metrics_cfg={},
    )

    elastic_launch(config=config, entrypoint=sys.executable)(*sys.argv, *EXTRA_ARGS)

    # The code below should actually never be reached as the process will be in a loop inside elastic_launch until any subprocess crashes.
    sys.exit(0)

scaled_all_reduce(tensors, num_gpus)

Performs the scaled all_reduce operation on the provided tensors. The input tensors are modified in-place. Currently supports only the sum reduction operator. The reduced values are scaled by the inverse size of the process group (equivalent to num_gpus).

Source code in training/utils/distributed_training_utils.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def scaled_all_reduce(tensors: torch.Tensor, num_gpus: int):
    """
    Performs the scaled all_reduce operation on the provided tensors.
    The input tensors are modified in-place.
    Currently supports only the sum
    reduction operator.
    The reduced values are scaled by the inverse size of the
    process group (equivalent to num_gpus).
    """
    # There is no need for reduction in the single-proc case
    if num_gpus == 1:
        return tensors

    # Queue the reductions
    reductions = []
    for tensor in tensors:
        reduction = torch.distributed.all_reduce(tensor, async_op=True)
        reductions.append(reduction)

    # Wait for reductions to finish
    for reduction in reductions:
        reduction.wait()

    # Scale the results
    for tensor in tensors:
        tensor.mul_(1.0 / num_gpus)
    return tensors

setup_cpu(multi_gpu=MultiGPUMode.AUTO, num_gpus=None)

Parameters:

Name Type Description Default
multi_gpu MultiGPUMode

DDP, DP, Off or AUTO

MultiGPUMode.AUTO
num_gpus int

Number of GPU's to use.

None
Source code in training/utils/distributed_training_utils.py
245
246
247
248
249
250
251
252
253
254
255
256
257
def setup_cpu(multi_gpu: MultiGPUMode = MultiGPUMode.AUTO, num_gpus: int = None):
    """
    :param multi_gpu:    DDP, DP, Off or AUTO
    :param num_gpus:     Number of GPU's to use.
    """
    if multi_gpu not in (MultiGPUMode.OFF, MultiGPUMode.AUTO):
        raise ValueError(f"device='cpu' and multi_gpu={multi_gpu} are not compatible together.")

    if num_gpus not in (0, None):
        raise ValueError(f"device='cpu' and num_gpus={num_gpus} are not compatible together.")

    device_config.device = "cpu"
    device_config.multi_gpu = MultiGPUMode.OFF

setup_device(multi_gpu=MultiGPUMode.AUTO, num_gpus=None, device='cuda')

If required, launch ddp subprocesses.

Parameters:

Name Type Description Default
multi_gpu MultiGPUMode

DDP, DP, Off or AUTO

MultiGPUMode.AUTO
num_gpus int

Number of GPU's to use. When None, use all available devices on DDP or only one device on DP/OFF.

None
device str

The device you want to use ('cpu' or 'cuda') If you only set num_gpus, your device will be set up according to the following logic: - setup_device(num_gpus=0) => gpu_mode='OFF' and device='cpu' - setup_device(num_gpus=1) => gpu_mode='OFF' and device='gpu' - setup_device(num_gpus>=2) => gpu_mode='DDP' and device='gpu' - setup_device(num_gpus=-1) => gpu_mode='DDP' and device='gpu' and num_gpus=<N-AVAILABLE-GPUs>

'cuda'
Source code in training/utils/distributed_training_utils.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
@resolve_param("multi_gpu", TypeFactory(MultiGPUMode.dict()))
def setup_device(multi_gpu: MultiGPUMode = MultiGPUMode.AUTO, num_gpus: int = None, device: str =