-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_proxy.py
More file actions
3068 lines (2474 loc) · 118 KB
/
api_proxy.py
File metadata and controls
3068 lines (2474 loc) · 118 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""API Proxy for Vigint services - handles routing and load balancing"""
import requests
import logging
import time
from flask import Flask, request, jsonify, Response
from urllib.parse import urljoin
from auth import require_api_key, require_api_key_flexible
from vigint.models import db, APIUsage
from config import config
import google.generativeai as genai
import base64
import cv2
import numpy as np
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import tempfile
import os
import shutil
import threading
import atexit
from collections import deque
import uuid
app = Flask(__name__)
app.config['SECRET_KEY'] = config.secret_key
app.config['SQLALCHEMY_DATABASE_URI'] = config.database_url
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# Initialize database with app
db.init_app(app)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Reduce werkzeug (Flask) logging verbosity
werkzeug_logger = logging.getLogger('werkzeug')
werkzeug_logger.setLevel(logging.WARNING)
# Configure Gemini AI (server-side only)
gemini_api_key = config.gemini_api_key
gemini_model_short = None
gemini_model_long = None
def initialize_gemini_models():
"""Initialize Gemini models with fallback - called lazily to avoid startup crashes"""
global gemini_model_short, gemini_model_long
if not gemini_api_key:
logger.warning("Gemini API key not configured")
return False
try:
genai.configure(
api_key=gemini_api_key,
transport='rest'
)
# Try different model versions with fallback
# Cost optimization: short buffer analyzes 3s, long buffer analyzes 10s only when incident detected
short_model_versions = [
'gemini-2.5-flash',
'gemini-pro'
]
long_model_versions = [
'gemini-2.5-flash',
'gemini-pro'
]
# Initialize short buffer model
for model_name in short_model_versions:
try:
gemini_model_short = genai.GenerativeModel(model_name)
logger.info(f"✅ Gemini short buffer model initialized: {model_name}")
break
except Exception as e:
logger.warning(f"Failed to load short buffer model {model_name}: {e}")
continue
# Initialize long buffer model
for model_name in long_model_versions:
try:
gemini_model_long = genai.GenerativeModel(model_name)
logger.info(f"✅ Gemini long buffer model initialized: {model_name}")
break
except Exception as e:
logger.warning(f"Failed to load long buffer model {model_name}: {e}")
continue
if not gemini_model_short or not gemini_model_long:
logger.error("❌ Failed to initialize Gemini models")
return False
else:
logger.info("✅ Gemini AI configured successfully on server")
return True
except Exception as e:
logger.error(f"❌ Critical error during Gemini initialization: {e}")
return False
# Try to initialize but don't crash if it fails
try:
initialize_gemini_models()
except Exception as e:
logger.error(f"❌ Gemini initialization failed but server will continue: {e}")
# Email configuration (server-side only)
# Read from environment variables first (for Render), then fallback to config file
email_config = {
'smtp_server': config.email_smtp_server,
'smtp_port': config.email_smtp_port,
'username': config.email_username,
'password': config.email_password,
'from_email': config.email_from,
'to_email': config.email_to
}
# Log email configuration status (without exposing sensitive data)
logger.info(f"📧 Email config loaded - Username: {'✅ SET' if email_config['username'] else '❌ NOT SET'}, To: {'✅ SET' if email_config['to_email'] else '❌ NOT SET'}")
# Video analysis configuration (server-side only)
video_config = {
'short_buffer_duration': config.getint('VideoAnalysis', 'short_buffer_duration', 3),
'long_buffer_duration': config.getint('VideoAnalysis', 'long_buffer_duration', 10),
'analysis_fps': config.getint('VideoAnalysis', 'analysis_fps', 25),
'video_format': config.get('VideoAnalysis', 'video_format', 'mp4'),
'compression_quality': config.getfloat('VideoAnalysis', 'compression_quality', 0.85),
'max_email_size_mb': config.getint('VideoAnalysis', 'max_email_size_mb', 20),
'preferred_codec': config.get('VideoAnalysis', 'preferred_codec', 'H264')
}
# Global frame buffers for each client (keyed by client ID)
client_frame_buffers = {}
# Multi-source frame buffers (keyed by client_id -> source_id -> deque)
multi_source_buffers = {}
multi_source_buffer_lock = threading.Lock()
# Temporary file management
temp_video_files = set() # Track temporary video files for cleanup
temp_file_lock = threading.Lock() # Thread-safe access to temp files set
def check_disk_space(path=None, min_free_gb=1.0):
"""
Check available disk space
Args:
path: Path to check (defaults to temp directory)
min_free_gb: Minimum free space required in GB
Returns:
dict: Disk space information and availability status
"""
if path is None:
path = tempfile.gettempdir()
try:
total, used, free = shutil.disk_usage(path)
free_gb = free / (1024**3) # Convert to GB
total_gb = total / (1024**3)
used_gb = used / (1024**3)
return {
'available': free_gb >= min_free_gb,
'free_gb': free_gb,
'total_gb': total_gb,
'used_gb': used_gb,
'path': path
}
except Exception as e:
logger.error(f"Error checking disk space: {e}")
return {
'available': False,
'error': str(e),
'path': path
}
def create_secure_temp_file(suffix='.mp4', prefix='vigint_incident_'):
"""
Create a secure temporary file for incident videos
Args:
suffix: File extension
prefix: Filename prefix
Returns:
dict: Temporary file information
"""
try:
# Check disk space first
disk_info = check_disk_space()
if not disk_info['available']:
return {
'success': False,
'error': f"Insufficient disk space: {disk_info.get('free_gb', 0):.2f} GB available",
'disk_info': disk_info
}
# Create secure temporary file
temp_fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
os.close(temp_fd) # Close file descriptor, we'll use the path
# Track the temporary file for cleanup
with temp_file_lock:
temp_video_files.add(temp_path)
logger.info(f"Created secure temporary file: {temp_path}")
return {
'success': True,
'path': temp_path,
'disk_info': disk_info
}
except Exception as e:
logger.error(f"Error creating secure temporary file: {e}")
return {
'success': False,
'error': str(e)
}
def cleanup_temp_file(file_path):
"""
Clean up a specific temporary file
Args:
file_path: Path to the temporary file to clean up
Returns:
bool: True if cleanup successful
"""
try:
if os.path.exists(file_path):
os.unlink(file_path)
logger.info(f"Cleaned up temporary file: {file_path}")
# Remove from tracking set
with temp_file_lock:
temp_video_files.discard(file_path)
return True
except Exception as e:
logger.error(f"Error cleaning up temporary file {file_path}: {e}")
return False
def cleanup_all_temp_files():
"""
Clean up all tracked temporary video files
Returns:
dict: Cleanup statistics
"""
cleaned_count = 0
failed_count = 0
with temp_file_lock:
temp_files_copy = temp_video_files.copy()
temp_video_files.clear()
for file_path in temp_files_copy:
try:
if os.path.exists(file_path):
os.unlink(file_path)
cleaned_count += 1
logger.debug(f"Cleaned up temporary file: {file_path}")
except Exception as e:
failed_count += 1
logger.warning(f"Failed to clean up temporary file {file_path}: {e}")
if cleaned_count > 0 or failed_count > 0:
logger.info(f"Temporary file cleanup: {cleaned_count} cleaned, {failed_count} failed")
return {
'cleaned': cleaned_count,
'failed': failed_count,
'total': cleaned_count + failed_count
}
def cleanup_old_temp_files(max_age_hours=24):
"""
Clean up old temporary files based on age
Args:
max_age_hours: Maximum age in hours before cleanup
Returns:
dict: Cleanup statistics
"""
import time
cleaned_count = 0
failed_count = 0
current_time = time.time()
max_age_seconds = max_age_hours * 3600
temp_dir = tempfile.gettempdir()
try:
for filename in os.listdir(temp_dir):
if filename.startswith('vigint_incident_') and filename.endswith('.mp4'):
file_path = os.path.join(temp_dir, filename)
try:
file_age = current_time - os.path.getmtime(file_path)
if file_age > max_age_seconds:
os.unlink(file_path)
cleaned_count += 1
logger.debug(f"Cleaned up old temporary file: {file_path}")
# Remove from tracking set if present
with temp_file_lock:
temp_video_files.discard(file_path)
except Exception as e:
failed_count += 1
logger.warning(f"Failed to clean up old file {file_path}: {e}")
except Exception as e:
logger.error(f"Error during old file cleanup: {e}")
return {'error': str(e)}
if cleaned_count > 0 or failed_count > 0:
logger.info(f"Old file cleanup: {cleaned_count} cleaned, {failed_count} failed")
return {
'cleaned': cleaned_count,
'failed': failed_count,
'total': cleaned_count + failed_count
}
def monitor_storage_usage():
"""
Monitor storage usage and perform cleanup if needed
Returns:
dict: Storage monitoring results
"""
disk_info = check_disk_space(min_free_gb=0.5) # Warning threshold
if disk_info['available']:
return {
'status': 'ok',
'disk_info': disk_info
}
# Perform emergency cleanup
logger.warning(f"Low disk space detected: {disk_info.get('free_gb', 0):.2f} GB available")
# Clean up old files first
old_cleanup = cleanup_old_temp_files(max_age_hours=1) # More aggressive cleanup
# Clean up all tracked files if still low on space
disk_info_after_old = check_disk_space(min_free_gb=0.5)
if not disk_info_after_old['available']:
all_cleanup = cleanup_all_temp_files()
disk_info_final = check_disk_space(min_free_gb=0.5)
return {
'status': 'emergency_cleanup',
'old_cleanup': old_cleanup,
'all_cleanup': all_cleanup,
'disk_info_initial': disk_info,
'disk_info_final': disk_info_final
}
return {
'status': 'cleanup_performed',
'old_cleanup': old_cleanup,
'disk_info_initial': disk_info,
'disk_info_after': disk_info_after_old
}
# Register cleanup function to run on exit
atexit.register(cleanup_all_temp_files)
def get_client_buffer(client_id):
"""Get or create frame buffer for a client"""
if client_id not in client_frame_buffers:
max_frames = video_config['long_buffer_duration'] * video_config['analysis_fps']
client_frame_buffers[client_id] = deque(maxlen=max_frames)
return client_frame_buffers[client_id]
def get_multi_source_buffer(client_id, source_id):
"""Get or create frame buffer for a specific video source"""
with multi_source_buffer_lock:
if client_id not in multi_source_buffers:
multi_source_buffers[client_id] = {}
if source_id not in multi_source_buffers[client_id]:
max_frames = video_config['long_buffer_duration'] * video_config['analysis_fps']
multi_source_buffers[client_id][source_id] = deque(maxlen=max_frames)
return multi_source_buffers[client_id][source_id]
def get_all_client_sources(client_id):
"""Get all video sources for a client"""
with multi_source_buffer_lock:
if client_id not in multi_source_buffers:
return []
return list(multi_source_buffers[client_id].keys())
def validate_video_format(video_format):
"""Validate video format and return appropriate codec"""
format_codecs = {
'mp4': 'mp4v',
'avi': 'XVID',
'mov': 'mp4v',
'mkv': 'XVID'
}
if video_format.lower() not in format_codecs:
logger.warning(f"Unsupported video format: {video_format}, defaulting to mp4")
return 'mp4', 'mp4v'
return video_format.lower(), format_codecs[video_format.lower()]
def create_video_from_frames(frames, output_path, fps=None, video_format=None, quality_optimization=True):
"""
Create a video file from a list of frames with configurable settings
Args:
frames: List of frame dictionaries with 'frame_data' key
output_path: Path where video will be saved
fps: Frames per second (defaults to config value)
video_format: Video format (defaults to config value)
quality_optimization: Whether to optimize video quality
Returns:
dict: Result with success status and metadata
"""
if not frames:
return {'success': False, 'error': 'No frames provided', 'frames_processed': 0}
# Use configuration defaults if not specified
if fps is None:
fps = video_config['analysis_fps']
if video_format is None:
video_format = video_config['video_format']
# Validate format and get codec
validated_format, codec = validate_video_format(video_format)
try:
# Decode and validate first frame
first_frame_data = base64.b64decode(frames[0]['frame_data'])
first_frame = cv2.imdecode(np.frombuffer(first_frame_data, np.uint8), cv2.IMREAD_COLOR)
if first_frame is None:
return {'success': False, 'error': 'Failed to decode first frame', 'frames_processed': 0}
height, width, channels = first_frame.shape
# Quality optimization settings
if quality_optimization:
# Ensure dimensions are even (required by some codecs)
if width % 2 != 0:
width -= 1
if height % 2 != 0:
height -= 1
# Create video writer with codec options ordered by compatibility
# Try most compatible codecs first (work without external encoders)
codec_options = [
('mp4v', cv2.VideoWriter_fourcc(*'mp4v')), # MPEG-4, most compatible
('MJPG', cv2.VideoWriter_fourcc(*'MJPG')), # Motion JPEG, very compatible
('XVID', cv2.VideoWriter_fourcc(*'XVID')), # Xvid codec
('avc1', cv2.VideoWriter_fourcc(*'avc1')), # H264 variant
('X264', cv2.VideoWriter_fourcc(*'X264')), # x264 codec
(codec, cv2.VideoWriter_fourcc(*codec)) # Original codec as fallback
]
video_writer = None
used_codec = None
for codec_name, fourcc in codec_options:
try:
video_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if video_writer.isOpened():
used_codec = codec_name
logger.info(f"✅ Using {codec_name} codec for video creation")
break
else:
video_writer.release()
logger.debug(f"❌ {codec_name} codec not available")
except Exception as e:
logger.debug(f"❌ {codec_name} codec failed: {e}")
continue
if not video_writer or not video_writer.isOpened():
logger.error(f"❌ Failed to initialize video writer - tried codecs: {[c[0] for c in codec_options]}")
return {'success': False, 'error': 'Failed to initialize video writer with any codec', 'frames_processed': 0}
frames_processed = 0
failed_frames = 0
# Write frames to video
for i, frame_info in enumerate(frames):
try:
frame_data = base64.b64decode(frame_info['frame_data'])
frame = cv2.imdecode(np.frombuffer(frame_data, np.uint8), cv2.IMREAD_COLOR)
if frame is None:
failed_frames += 1
logger.warning(f"Failed to decode frame {i}")
continue
# Resize frame if dimensions don't match (quality optimization)
if quality_optimization and (frame.shape[1] != width or frame.shape[0] != height):
frame = cv2.resize(frame, (width, height), interpolation=cv2.INTER_LANCZOS4)
# Add timestamp overlay if frame has timestamp
if 'timestamp' in frame_info:
timestamp_text = frame_info['timestamp']
cv2.putText(frame, timestamp_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
video_writer.write(frame)
frames_processed += 1
except Exception as e:
failed_frames += 1
logger.warning(f"Error processing frame {i}: {e}")
continue
video_writer.release()
# Verify video was created successfully
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0:
return {'success': False, 'error': 'Video file not created or empty', 'frames_processed': frames_processed}
return {
'success': True,
'frames_processed': frames_processed,
'failed_frames': failed_frames,
'video_path': output_path,
'duration_seconds': frames_processed / fps,
'fps': fps,
'format': validated_format,
'resolution': f"{width}x{height}",
'codec_used': used_codec,
'quality_optimized': quality_optimization
}
except Exception as e:
logger.error(f"Error creating video: {e}")
return {'success': False, 'error': str(e), 'frames_processed': 0}
def generate_incident_video(client_id, incident_analysis, buffer_type='long'):
"""
Generate video for incident documentation (use uploaded video directly)
Args:
client_id: ID of the client
incident_analysis: Analysis result containing incident details
buffer_type: Type of buffer to use ('short' or 'long')
Returns:
dict: Result with video path and metadata
"""
try:
# Get client's video metadata
if client_id not in client_frame_buffers:
return {'success': False, 'error': 'No video uploaded'}
video_meta = client_frame_buffers[client_id]
if not video_meta or video_meta.get('type') != 'video':
return {'success': False, 'error': 'No video in buffer'}
source_video_path = video_meta.get('video_path')
if not source_video_path or not os.path.exists(source_video_path):
return {'success': False, 'error': 'Video file not found'}
# Use uploaded video directly for email (no need to regenerate)
# Just copy to a new temp file with incident-specific naming
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
risk_level = incident_analysis.get('risk_level', 'UNKNOWN') if incident_analysis else 'UNKNOWN'
temp_prefix = f"vigint_incident_{client_id}_{risk_level}_{timestamp}_"
temp_suffix = '.mp4'
# Check storage and create secure temp file
storage_status = monitor_storage_usage()
if storage_status['status'] == 'emergency_cleanup':
logger.warning("Emergency storage cleanup performed before video generation")
temp_result = create_secure_temp_file(suffix=temp_suffix, prefix=temp_prefix)
if not temp_result['success']:
return temp_result
video_path = temp_result['path']
# Copy uploaded video to incident video path
import shutil
shutil.copy2(source_video_path, video_path)
logger.info(f"📹 Using uploaded video for incident ({video_meta.get('video_size_mb', 0):.2f} MB)")
# Return success with video metadata
result = {
'success': True,
'video_path': video_path,
'frames_processed': video_meta.get('frame_count', 0),
'fps': video_meta.get('fps', 25),
'duration': video_meta.get('duration', 0),
'file_size_mb': video_meta.get('video_size_mb', 0)
}
result.update({
'incident_type': incident_analysis.get('incident_type', 'security_incident') if incident_analysis else 'security_incident',
'client_id': client_id,
'risk_level': risk_level,
'buffer_type': buffer_type,
'buffer_duration': result['duration'],
'incident_timestamp': timestamp,
'analysis_summary': incident_analysis.get('analysis', '') if incident_analysis else ''
})
logger.info(f"Generated incident video for client {client_id}: {video_path}")
return result
except Exception as e:
logger.error(f"Error generating incident video: {e}")
return {'success': False, 'error': str(e)}
def analyze_video_with_gemini(video_path, model, analyze_duration=None, buffer_type="short"):
"""Analyze video file directly with Gemini using File API (NEW APPROACH)"""
if not model or not video_path or not os.path.exists(video_path):
return None
try:
# Upload video to Gemini File API
logger.info(f"📤 Uploading video to Gemini File API...")
uploaded_video = genai.upload_file(video_path)
logger.info(f"✅ Video uploaded to Gemini: {uploaded_video.name}")
# Wait for processing
import time as time_module
while uploaded_video.state.name == "PROCESSING":
time_module.sleep(1)
uploaded_video = genai.get_file(uploaded_video.name)
if uploaded_video.state.name == "FAILED":
logger.error("❌ Video processing failed in Gemini")
return None
# Build prompt based on duration
duration_text = f"first {analyze_duration} seconds" if analyze_duration else "entire video"
prompt = f"""
Analyze the {duration_text} of this security video for shoplifting incidents in a retail environment.
IMPORTANT: This is a VIDEO showing behavior over time. Look for MOVEMENT and BEHAVIOR PATTERNS.
Focus on:
1. Customers concealing merchandise (watch their MOVEMENTS)
2. Suspicious handling of items (track HOW they interact over time)
3. Taking items without paying (follow the PROGRESSION)
4. Removing security tags or packaging (watch the SEQUENCE)
Analyze the video to understand the complete context. A single suspicious moment is not enough - look for patterns.
Return ONLY a JSON object without markdown formatting:
{{"incident_detected": boolean, "incident_type": string, "description": string, "analysis": string}}
Answer in French.
"""
# Generate content with video
logger.info(f"🎬 Sending video to Gemini {model._model_name} for analysis...")
response = model.generate_content([uploaded_video, prompt])
# Cleanup uploaded file
try:
genai.delete_file(uploaded_video.name)
logger.info(f"🗑️ Deleted uploaded video from Gemini")
except:
pass
# Parse JSON response
try:
import json
response_text = response.text.strip()
# Handle JSON wrapped in markdown code blocks
if response_text.startswith('```json'):
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx != -1 and end_idx > start_idx:
response_text = response_text[start_idx:end_idx]
analysis_json = json.loads(response_text)
has_security_incident = analysis_json.get('incident_detected', False)
description = analysis_json.get('description', '')
analysis_text = analysis_json.get('analysis', '')
incident_type = analysis_json.get('incident_type', '')
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Failed to parse JSON response: {e}")
analysis_text = response.text
has_security_incident = 'incident_detected": true' in analysis_text.lower()
incident_type = ""
risk_level = "HIGH" if has_security_incident else "LOW"
logger.info(f"✅ VIDEO analysis: incident_detected={has_security_incident}")
return {
'has_security_incident': has_security_incident,
'incident_detected': has_security_incident,
'incident_type': incident_type,
'analysis': analysis_text,
'risk_level': risk_level,
'description': description
}
except Exception as e:
logger.error(f"Error analyzing video with Gemini: {e}")
return None
def analyze_short_video_for_security(frames, buffer_type="short"):
"""Analyze SHORT VIDEO sequence for security incidents using Gemini AI"""
# Select appropriate model based on buffer type
if buffer_type == "short":
model = gemini_model_short
else:
model = gemini_model_long
if not model or not frames:
return None
frame_count = frames[-1]['frame_count'] if frames else 0
try:
prompt = f"""
Analyze this COMPLETE SHORT VIDEO SEQUENCE ({len(frames)} frames over ~{len(frames)/25:.1f} seconds) for security incidents in a retail environment.
IMPORTANT: This is a VIDEO showing {len(frames)/25:.1f} seconds of footage. Look for MOVEMENT and BEHAVIOR throughout the ENTIRE duration.
Focus on shoplifting behavior:
1. Customers concealing merchandise (watch their MOVEMENTS throughout the video)
2. Suspicious handling of items (track HOW they interact over time)
3. Taking items without paying (follow the COMPLETE PROGRESSION)
4. Removing security tags or packaging (watch the ENTIRE SEQUENCE)
Analyze the FULL video to understand the complete context. A single suspicious moment is not enough - look for patterns over time.
Return ONLY a JSON object without markdown formatting:
{{"incident_detected": boolean, "incident_type": string, "description": string, "analysis": string}}
Answer in French.
"""
# Prepare video sequence
video_parts = [prompt]
for frame_info in frames:
video_parts.append({"mime_type": "image/jpeg", "data": frame_info['frame_data']})
logger.info(f"🎬 Sending {len(frames)} frames to Flash-Lite for SHORT VIDEO analysis...")
response = model.generate_content(video_parts)
# Parse JSON response
try:
import json
response_text = response.text.strip()
# Handle JSON wrapped in markdown code blocks
if response_text.startswith('```json'):
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx != -1 and end_idx > start_idx:
response_text = response_text[start_idx:end_idx]
analysis_json = json.loads(response_text)
has_security_incident = analysis_json.get('incident_detected', False)
description = analysis_json.get('description', '')
analysis_text = analysis_json.get('analysis', '')
incident_type = analysis_json.get('incident_type', '')
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Failed to parse JSON response, falling back to text analysis: {e}")
analysis_text = response.text
analysis_text_lower = analysis_text.lower()
has_security_incident = 'incident_detected": true' in analysis_text_lower
# Try to extract incident_type from text
incident_type = ""
if '"incident_type":' in analysis_text_lower:
try:
import re
match = re.search(r'"incident_type":\s*"([^"]*)"', analysis_text)
if match:
incident_type = match.group(1)
except Exception:
pass
# Determine risk level
risk_level = "HIGH" if has_security_incident else "LOW"
logger.info(f"✅ SHORT VIDEO analysis: incident_detected={has_security_incident}")
return {
'analysis': analysis_text,
'has_security_incident': has_security_incident,
'risk_level': risk_level,
'timestamp': datetime.now().isoformat(),
'frame_count': frame_count,
'incident_type': incident_type,
'video_analysis': True,
'frames_analyzed': len(frames)
}
except Exception as e:
logger.error(f"Error in short video analysis: {e}")
return None
def analyze_frame_for_security(frame_base64, frame_count, buffer_type="short"):
"""DEPRECATED: Analyze single frame for security incidents using Gemini AI - use analyze_short_video_for_security instead"""
# Select appropriate model based on buffer type
if buffer_type == "short":
model = gemini_model_short
else:
model = gemini_model_long
if not model:
return None
# Temporary quota check - return mock analysis if quota exceeded
import time
current_hour = int(time.time() / 3600)
if hasattr(analyze_frame_for_security, 'quota_exceeded_hour') and \
analyze_frame_for_security.quota_exceeded_hour == current_hour:
return {
'analysis': 'Analysis temporarily disabled due to API quota limits',
'has_security_incident': False,
'risk_level': "LOW",
'timestamp': datetime.now().isoformat(),
'frame_count': frame_count
}
try:
prompt = f"""
Analyze the provided video carefully for security incidents in a retail environment, with special focus on shoplifting behavior. Pay particular attention to:
1. Customers taking items and concealing them (in pockets, bags, clothing)
2. Unusual handling of merchandise (checking for security tags)
3. Taking items without paying
4. Removing packaging or security devices
Return ONLY the JSON object without markdown formatting, code blocks, or additional text.
If no incidents are detected, return the JSON with incident_detected set to false.
Your response must be a valid JSON object with the following structure:
{{"incident_detected": boolean, // true if an incident is detected, false otherwise
"incident_type": string // Describe the type of incident (e.g.: shoplifting)
"description": string, // description of what you see
"analysis": string, // detailed analysis of the video content}}
Answer in French.
"""
response = model.generate_content([
prompt,
{"mime_type": "image/jpeg", "data": frame_base64}
])
# Parse JSON response
try:
import json
response_text = response.text.strip()
# Handle JSON wrapped in markdown code blocks
if response_text.startswith('```json'):
# Extract JSON from markdown code block
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx != -1 and end_idx > start_idx:
response_text = response_text[start_idx:end_idx]
analysis_json = json.loads(response_text)
has_security_incident = analysis_json.get('incident_detected', False)
description = analysis_json.get('description', '')
analysis_text = analysis_json.get('analysis', '')
incident_type = analysis_json.get('incident_type', '')
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Failed to parse JSON response, falling back to text analysis: {e}")
analysis_text = response.text
analysis_text_lower = analysis_text.lower()
has_security_incident = 'incident_detected": true' in analysis_text_lower
# Try to extract incident_type from text
incident_type = ""
if '"incident_type":' in analysis_text_lower:
try:
import re
match = re.search(r'"incident_type":\s*"([^"]*)"', analysis_text)
if match:
incident_type = match.group(1)
except Exception:
pass
# Determine risk level based on incident detection
risk_level = "HIGH" if has_security_incident else "LOW"
return {
'analysis': analysis_text,
'has_security_incident': has_security_incident,
'risk_level': risk_level,
'timestamp': datetime.now().isoformat(),
'frame_count': frame_count,
'incident_type': incident_type
}
except Exception as e:
error_msg = str(e)
logger.error(f"Error in security analysis: {e}")
# Check if it's a quota error and mark it
if "429" in error_msg or "quota" in error_msg.lower():
analyze_frame_for_security.quota_exceeded_hour = int(time.time() / 3600)
logger.warning("API quota exceeded, disabling analysis for this hour")
return None
# Backend service configurations
BACKEND_SERVICES = {
'rtsp': {
'base_url': 'http://localhost:9997', # MediaMTX API
'timeout': 30
},
'billing': {
'base_url': 'http://localhost:5001', # Billing service
'timeout': 30
}
}
def log_api_usage(api_key_id, endpoint, method, status_code, cost=0.001):
"""Log API usage for billing purposes"""
try:
usage = APIUsage(
api_key_id=api_key_id,
endpoint=endpoint,
method=method,
status_code=status_code,
cost=cost
)
db.session.add(usage)
db.session.commit()
except Exception as e:
logger.error(f"Failed to log API usage: {e}")
def proxy_request(service_name, path, method='GET', **kwargs):
"""Proxy a request to a backend service"""
if service_name not in BACKEND_SERVICES:
return None, 404
service_config = BACKEND_SERVICES[service_name]
url = urljoin(service_config['base_url'], path)
timeout = service_config['timeout']
try:
# Prepare request parameters
req_kwargs = {
'timeout': timeout,
'headers': dict(request.headers),
'params': dict(request.args)
}
# Add request body for POST/PUT requests
if method.upper() in ['POST', 'PUT', 'PATCH']:
if request.is_json:
req_kwargs['json'] = request.get_json()
else:
req_kwargs['data'] = request.get_data()