Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions videodb/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,93 @@ def index_scenes(
return None
return scenes_data.get("scene_index_id")

def index_visuals(
self,
prompt: Optional[str] = None,
batch_config: Optional[Dict] = None,
model_name: Optional[str] = None,
model_config: Optional[Dict] = None,
name: Optional[str] = None,
callback_url: Optional[str] = None,
) -> Optional[str]:
"""Index visuals (scenes) from the video.

:param str prompt: Prompt for scene description
:param dict batch_config: Frame extraction config with keys:
- "type": Extraction type ("time" or "shot"). Default is "time".
- "value": Window size in seconds (for time) or threshold (for shot). Default is 10.
- "frame_count": Number of frames to extract per window. Default is 1.
- "select_frames": Which frames to select (e.g., ["first", "middle", "last"]). Default is ["first"].
:param str model_name: Name of the model
:param dict model_config: Configuration for the model
:param str name: Name of the visual index
:param str callback_url: URL to receive the callback (optional)
:return: The scene index id
:rtype: str
"""
if batch_config is None:
batch_config = {"type": "time", "value": 10, "frame_count": 1}

extraction_type = batch_config.get("type", "time")
if extraction_type == "shot":
extraction_type = SceneExtractionType.shot_based
extraction_config = {
"threshold": batch_config.get("value", 20),
"frame_count": batch_config.get("frame_count", 1),
}
else:
extraction_type = SceneExtractionType.time_based
extraction_config = {
"time": batch_config.get("value", 10),
"frame_count": batch_config.get("frame_count", 1),
"select_frames": batch_config.get("select_frames", ["first"]),
}

scenes_data = self._connection.post(
path=f"{ApiPath.video}/{self.id}/{ApiPath.index}/{ApiPath.scene}",
data={
"extraction_type": extraction_type,
"extraction_config": extraction_config,
"prompt": prompt,
"model_name": model_name,
"model_config": model_config or {},
"name": name,
"callback_url": callback_url,
},
)
if not scenes_data:
return None
return scenes_data.get("scene_index_id")

def index_audio(
self,
language_code: Optional[str] = None,
segmentation_type: Optional[SegmentationType] = SegmentationType.sentence,
force: bool = False,
callback_url: str = None,
) -> None:
"""Index audio (spoken words) in the video.

:param str language_code: (optional) Language code of the video
:param SegmentationType segmentation_type: (optional) Segmentation type used for indexing, :class:`SegmentationType <SegmentationType>` object
:param bool force: (optional) Force to index the video
:param str callback_url: (optional) URL to receive the callback
:raises InvalidRequestError: If the video is already indexed
:return: None if the indexing is successful
:rtype: None
"""
self._connection.post(
path=f"{ApiPath.video}/{self.id}/{ApiPath.index}",
data={
"index_type": IndexType.spoken_word,
"language_code": language_code,
"segmentation_type": segmentation_type,
"force": force,
"callback_url": callback_url,
},
show_progress=True,
)

def list_scene_index(self) -> List:
"""List all the scene indexes.

Expand Down