Skip to content

Pipelines

DetectionPipeline

Bases: Pipeline

Pipeline specifically designed for object detection tasks. The pipeline includes loading images, preprocessing, prediction, and postprocessing.

Parameters:

Name Type Description Default
model SgModule

The object detection model (instance of SgModule) used for making predictions.

required
class_names List[str]

List of class names corresponding to the model's output classes.

required
post_prediction_callback DetectionPostPredictionCallback

Callback function to process raw predictions from the model.

required
image_processor Optional[Processing]

Single image processor or a list of image processors for preprocessing and postprocessing the images.

None
device Optional[str]

The device on which the model will be run. If None, will run on current model device. Use "cuda" for GPU support.

None
Source code in training/pipelines/pipelines.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class DetectionPipeline(Pipeline):
    """Pipeline specifically designed for object detection tasks.
    The pipeline includes loading images, preprocessing, prediction, and postprocessing.

    :param model:                       The object detection model (instance of SgModule) used for making predictions.
    :param class_names:                 List of class names corresponding to the model's output classes.
    :param post_prediction_callback:    Callback function to process raw predictions from the model.
    :param image_processor:             Single image processor or a list of image processors for preprocessing and postprocessing the images.
    :param device:                      The device on which the model will be run. If None, will run on current model device. Use "cuda" for GPU support.
    """

    def __init__(
        self,
        model: SgModule,
        class_names: List[str],
        post_prediction_callback: DetectionPostPredictionCallback,
        device: Optional[str] = None,
        image_processor: Optional[Processing] = None,
    ):
        super().__init__(model=model, device=device, image_processor=image_processor, class_names=class_names)
        self.post_prediction_callback = post_prediction_callback

    def _decode_model_output(self, model_output: Union[List, Tuple, torch.Tensor], model_input: np.ndarray) -> List[DetectionPrediction]:
        """Decode the model output, by applying post prediction callback. This includes NMS.

        :param model_output:    Direct output of the model, without any post-processing.
        :param model_input:     Model input (i.e. images after preprocessing).
        :return:                Predicted Bboxes.
        """
        post_nms_predictions = self.post_prediction_callback(model_output, device=self.device)

        predictions = []
        for prediction, image in zip(post_nms_predictions, model_input):
            prediction = prediction if prediction is not None else torch.zeros((0, 6), dtype=torch.float32)
            prediction = prediction.detach().cpu().numpy()
            predictions.append(
                DetectionPrediction(
                    bboxes=prediction[:, :4],
                    confidence=prediction[:, 4],
                    labels=prediction[:, 5],
                    bbox_format="xyxy",
                    image_shape=image.shape,
                )
            )

        return predictions

    def _instantiate_image_prediction(self, image: np.ndarray, prediction: DetectionPrediction) -> ImagePrediction:
        return ImageDetectionPrediction(image=image, prediction=prediction, class_names=self.class_names)

    def _combine_image_prediction_to_images(
        self, images_predictions: Iterable[ImageDetectionPrediction], n_images: Optional[int] = None
    ) -> ImagesDetectionPrediction:
        if n_images is not None and n_images == 1:
            # Do not show tqdm progress bar if there is only one image
            images_predictions = [next(iter(images_predictions))]
        else:
            images_predictions = [image_predictions for image_predictions in tqdm(images_predictions, total=n_images, desc="Predicting Images")]

        return ImagesDetectionPrediction(_images_prediction_lst=images_predictions)

    def _combine_image_prediction_to_video(
        self, images_predictions: Iterable[ImageDetectionPrediction], fps: float, n_images: Optional[int] = None
    ) -> VideoDetectionPrediction:
        images_predictions = [image_predictions for image_predictions in tqdm(images_predictions, total=n_images, desc="Predicting Video")]
        return VideoDetectionPrediction(_images_prediction_lst=images_predictions, fps=fps)

Pipeline

Bases: ABC

An abstract base class representing a processing pipeline for a specific task. The pipeline includes loading images, preprocessing, prediction, and postprocessing.

Parameters:

Name Type Description Default
model SgModule

The model used for making predictions.

required
image_processor Union[Processing, List[Processing]]

A single image processor or a list of image processors for preprocessing and postprocessing the images.

required
device Optional[str]

The device on which the model will be run. If None, will run on current model device. Use "cuda" for GPU support.

None
Source code in training/pipelines/pipelines.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class Pipeline(ABC):
    """An abstract base class representing a processing pipeline for a specific task.
    The pipeline includes loading images, preprocessing, prediction, and postprocessing.

    :param model:           The model used for making predictions.
    :param image_processor: A single image processor or a list of image processors for preprocessing and postprocessing the images.
    :param device:          The device on which the model will be run. If None, will run on current model device. Use "cuda" for GPU support.
    """

    def __init__(self, model: SgModule, image_processor: Union[Processing, List[Processing]], class_names: List[str], device: Optional[str] = None):
        super().__init__()
        self.device = device or next(model.parameters()).device
        self.model = model.to(self.device)
        self.class_names = class_names

        if isinstance(image_processor, list):
            image_processor = ComposeProcessing(image_processor)
        self.image_processor = image_processor

    def __call__(self, inputs: Union[str, ImageSource, List[ImageSource]], batch_size: Optional[int] = 32) -> ImagesPredictions:
        """Predict an image or a list of images.

        Supported types include:
            - str:              A string representing either a video, an image or an URL.
            - numpy.ndarray:    A numpy array representing the image
            - torch.Tensor:     A PyTorch tensor representing the image
            - PIL.Image.Image:  A PIL Image object
            - List:             A list of images of any of the above image types (list of videos not supported).

        :param inputs:      inputs to the model, which can be any of the above-mentioned types.
        :param batch_size:  Number of images to be processed at the same time.
        :return:            Results of the prediction.
        """

        if includes_video_extension(inputs):
            return self.predict_video(inputs, batch_size)
        elif check_image_typing(inputs):
            return self.predict_images(inputs, batch_size)
        else:
            raise ValueError(f"Input {inputs} not supported for prediction.")

    def predict_images(self, images: Union[ImageSource, List[ImageSource]], batch_size: Optional[int] = 32) -> ImagesPredictions:
        """Predict an image or a list of images.

        :param images:      Images to predict.
        :param batch_size:  The size of each batch.
        :return:            Results of the prediction.
        """
        from super_gradients.training.utils.media.image import load_images

        images = load_images(images)
        result_generator = self._generate_prediction_result(images=images, batch_size=batch_size)
        return self._combine_image_prediction_to_images(result_generator, n_images=len(images))

    def predict_video(self, video_path: str, batch_size: Optional[int] = 32) -> VideoPredictions:
        """Predict on a video file, by processing the frames in batches.

        :param video_path:  Path to the video file.
        :param batch_size:  The size of each batch.
        :return:            Results of the prediction.
        """
        video_frames, fps = load_video(file_path=video_path)
        result_generator = self._generate_prediction_result(images=video_frames, batch_size=batch_size)
        return self._combine_image_prediction_to_video(result_generator, fps=fps, n_images=len(video_frames))

    def predict_webcam(self) -> None:
        """Predict using webcam"""

        def _draw_predictions(frame: np.ndarray) -> np.ndarray:
            """Draw the predictions on a single frame from the stream."""
            frame_prediction = next(iter(self._generate_prediction_result(images=[frame])))
            return frame_prediction.draw()

        video_streaming = WebcamStreaming(frame_processing_fn=_draw_predictions, fps_update_frequency=1)
        video_streaming.run()

    def _generate_prediction_result(self, images: Iterable[np.ndarray], batch_size: Optional[int] = None) -> Iterable[ImagePrediction]:
        """Run the pipeline on the images as single batch or through multiple batches.

        NOTE: A core motivation to have this function as a generator is that it can be used in a lazy way (if images is generator itself),
              i.e. without having to load all the images into memory.

        :param images:      Iterable of numpy arrays representing images.
        :param batch_size:  The size of each batch.
        :return:            Iterable of Results object, each containing the results of the prediction and the image.
        """
        if batch_size is None:
            yield from self._generate_prediction_result_single_batch(images)
        else:
            for batch_images in generate_batch(images, batch_size):
                yield from self._generate_prediction_result_single_batch(batch_images)

    def _generate_prediction_result_single_batch(self, images: Iterable[np.ndarray]) -> Iterable[ImagePrediction]:
        """Run the pipeline on images. The pipeline is made of 4 steps:
            1. Load images - Loading the images into a list of numpy arrays.
            2. Preprocess - Encode the image in the shape/format expected by the model
            3. Predict - Run the model on the preprocessed image
            4. Postprocess - Decode the output of the model so that the predictions are in the shape/format of original image.

        :param images:  Iterable of numpy arrays representing images.
        :return:        Iterable of Results object, each containing the results of the prediction and the image.
        """
        images = list(images)  # We need to load all the images into memory, and to reuse it afterwards.
        self.model = self.model.to(self.device)  # Make sure the model is on the correct device, as it might have been moved after init

        # Preprocess
        preprocessed_images, processing_metadatas = [], []
        for image in images:
            preprocessed_image, processing_metadata = self.image_processor.preprocess_image(image=image.copy())
            preprocessed_images.append(preprocessed_image)
            processing_metadatas.append(processing_metadata)

        # Predict
        with eval_mode(self.model):
            torch_inputs = torch.Tensor(np.array(preprocessed_images)).to(self.device)
            model_output = self.model(torch_inputs)
            predictions = self._decode_model_output(model_output, model_input=torch_inputs)

        # Postprocess
        postprocessed_predictions = []
        for image, prediction, processing_metadata in zip(images, predictions, processing_metadatas):
            prediction = self.image_processor.postprocess_predictions(predictions=prediction, metadata=processing_metadata)
            postprocessed_predictions.append(prediction)

        # Yield results one by one
        for image, prediction in zip(images, postprocessed_predictions):
            yield self._instantiate_image_prediction(image=image, prediction=prediction)

    @abstractmethod
    def _decode_model_output(self, model_output: Union[List, Tuple, torch.Tensor], model_input: np.ndarray) -> List[Prediction]:
        """Decode the model outputs, move each prediction to numpy and store it in a Prediction object.

        :param model_output:    Direct output of the model, without any post-processing.
        :param model_input:     Model input (i.e. images after preprocessing).
        :return:                Model predictions, without any post-processing.
        """
        raise NotImplementedError

    @abstractmethod
    def _instantiate_image_prediction(self, image: np.ndarray, prediction: Prediction) -> ImagePrediction:
        """Instantiate an object wrapping an image and the pipeline's prediction.

        :param image:       Image to predict.
        :param prediction:  Model prediction on that image.
        :return:            Object wrapping an image and the pipeline's prediction.
        """
        raise NotImplementedError

    @abstractmethod
    def _combine_image_prediction_to_images(self, images_prediction_lst: Iterable[ImagePrediction], n_images: Optional[int] = None) -> ImagesPredictions:
        """Instantiate an object wrapping the list of images and the pipeline's predictions on them.

        :param images_prediction_lst:   List of image predictions.
        :param n_images:                (Optional) Number of images in the list. This used for tqdm progress bar to work with iterables, but is not required.
        :return:                        Object wrapping the list of image predictions.
        """
        raise NotImplementedError

    @abstractmethod
    def _combine_image_prediction_to_video(
        self, images_prediction_lst: Iterable[ImagePrediction], fps: float, n_images: Optional[int] = None
    ) -> VideoPredictions:
        """Instantiate an object holding the video frames and the pipeline's predictions on it.

        :param images_prediction_lst:   List of image predictions.
        :param fps:                     Frames per second.
        :param n_images:                (Optional) Number of images in the list. This used for tqdm progress bar to work with iterables, but is not required.
        :return:                        Object wrapping the list of image predictions as a Video.
        """
        raise NotImplementedError

__call__(inputs, batch_size=32)

Predict an image or a list of images.

Supported types include: - str: A string representing either a video, an image or an URL. - numpy.ndarray: A numpy array representing the image - torch.Tensor: A PyTorch tensor representing the image - PIL.Image.Image: A PIL Image object - List: A list of images of any of the above image types (list of videos not supported).

Parameters:

Name Type Description Default
inputs Union[str, ImageSource, List[ImageSource]]

inputs to the model, which can be any of the above-mentioned types.

required
batch_size Optional[int]

Number of images to be processed at the same time.

32

Returns:

Type Description
ImagesPredictions

Results of the prediction.

Source code in training/pipelines/pipelines.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __call__(self, inputs: Union[str, ImageSource, List[ImageSource]], batch_size: Optional[int] = 32) -> ImagesPredictions:
    """Predict an image or a list of images.

    Supported types include:
        - str:              A string representing either a video, an image or an URL.
        - numpy.ndarray:    A numpy array representing the image
        - torch.Tensor:     A PyTorch tensor representing the image
        - PIL.Image.Image:  A PIL Image object
        - List:             A list of images of any of the above image types (list of videos not supported).

    :param inputs:      inputs to the model, which can be any of the above-mentioned types.
    :param batch_size:  Number of images to be processed at the same time.
    :return:            Results of the prediction.
    """

    if includes_video_extension(inputs):
        return self.predict_video(inputs, batch_size)
    elif check_image_typing(inputs):
        return self.predict_images(inputs, batch_size)
    else:
        raise ValueError(f"Input {inputs} not supported for prediction.")

predict_images(images, batch_size=32)

Predict an image or a list of images.

Parameters:

Name Type Description Default
images Union[ImageSource, List[ImageSource]]

Images to predict.

required
batch_size Optional[int]

The size of each batch.

32

Returns:

Type Description
ImagesPredictions

Results of the prediction.

Source code in training/pipelines/pipelines.py
84
85
86
87
88
89
90
91
92
93
94
95
def predict_images(self, images: Union[ImageSource, List[ImageSource]], batch_size: Optional[int] = 32) -> ImagesPredictions:
    """Predict an image or a list of images.

    :param images:      Images to predict.
    :param batch_size:  The size of each batch.
    :return:            Results of the prediction.
    """
    from super_gradients.training.utils.media.image import load_images

    images = load_images(images)
    result_generator = self._generate_prediction_result(images=images, batch_size=batch_size)
    return self._combine_image_prediction_to_images(result_generator, n_images=len(images))

predict_video(video_path, batch_size=32)

Predict on a video file, by processing the frames in batches.

Parameters:

Name Type Description Default
video_path str

Path to the video file.

required
batch_size Optional[int]

The size of each batch.

32

Returns:

Type Description
VideoPredictions

Results of the prediction.

Source code in training/pipelines/pipelines.py
 97
 98
 99
100
101
102
103
104
105
106
def predict_video(self, video_path: str, batch_size: Optional[int] = 32) -> VideoPredictions:
    """Predict on a video file, by processing the frames in batches.

    :param video_path:  Path to the video file.
    :param batch_size:  The size of each batch.
    :return:            Results of the prediction.
    """
    video_frames, fps = load_video(file_path=video_path)
    result_generator = self._generate_prediction_result(images=video_frames, batch_size=batch_size)
    return self._combine_image_prediction_to_video(result_generator, fps=fps, n_images=len(video_frames))

predict_webcam()

Predict using webcam

Source code in training/pipelines/pipelines.py
108
109
110
111
112
113
114
115
116
117
def predict_webcam(self) -> None:
    """Predict using webcam"""

    def _draw_predictions(frame: np.ndarray) -> np.ndarray:
        """Draw the predictions on a single frame from the stream."""
        frame_prediction = next(iter(self._generate_prediction_result(images=[frame])))
        return frame_prediction.draw()

    video_streaming = WebcamStreaming(frame_processing_fn=_draw_predictions, fps_update_frequency=1)
    video_streaming.run()

eval_mode(model)

Set a model in evaluation mode and deactivate gradient computation, undo at the end.

Parameters:

Name Type Description Default
model SgModule

The model to set in evaluation mode.

required
Source code in training/pipelines/pipelines.py
30
31
32
33
34
35
36
37
38
39
40
@contextmanager
def eval_mode(model: SgModule) -> None:
    """Set a model in evaluation mode and deactivate gradient computation, undo at the end.

    :param model: The model to set in evaluation mode.
    """
    _starting_mode = model.training
    model.eval()
    with torch.no_grad():
        yield
    model.train(mode=_starting_mode)