133 lines
4.2 KiB
Python
133 lines
4.2 KiB
Python
import io
|
|
import sys
|
|
|
|
import ffmpeg
|
|
import os
|
|
import tempfile
|
|
from PIL import Image
|
|
from pydub import AudioSegment
|
|
from pydub.playback import play
|
|
|
|
|
|
def old_test():
|
|
file_name = tempfile.gettempdir() + os.path.sep + "test.jpg"
|
|
print(file_name)
|
|
if os.path.exists(file_name):
|
|
os.remove(file_name)
|
|
process = (
|
|
ffmpeg
|
|
.input('rtsp://172.22.246.207/extron1')
|
|
# .input('rtsp://172.22.246.207/extron3')
|
|
.output(file_name, vframes=1)
|
|
# .output('-', format='h264')
|
|
.run(capture_stdout=True)
|
|
)
|
|
image = Image.open(file_name)
|
|
r, g, b = image.split()
|
|
print(r.histogram())
|
|
print(g.histogram())
|
|
print(b.histogram())
|
|
image.show()
|
|
|
|
|
|
# old_test()
|
|
|
|
def is_single_color_image(image):
|
|
single_color_image = True
|
|
color = {}
|
|
count = 0
|
|
color_channels = image.split()
|
|
for c in color_channels: # r, g, b
|
|
|
|
hist = c.histogram()
|
|
num_of_non_zero_values = len([v for v in hist if v != 0])
|
|
if num_of_non_zero_values > 1:
|
|
single_color_image = False
|
|
break
|
|
else:
|
|
color[count] = [i for i in enumerate(hist) if i[1] != 0][0][0]
|
|
count += 1
|
|
return single_color_image, color
|
|
|
|
|
|
def check_frame_is_valid(stream_url, raise_errors=True):
|
|
try:
|
|
frame, _ = (
|
|
ffmpeg
|
|
.input(stream_url)
|
|
.output('pipe:', vframes=1, format='image2', vcodec='mjpeg')
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
image = Image.open(io.BytesIO(frame))
|
|
single_color_image, color = is_single_color_image(image)
|
|
if not single_color_image:
|
|
image.show()
|
|
return True, "all good :-)"
|
|
else:
|
|
if all(map(lambda x: x == 0, color.values())):
|
|
return False, "Image is entirely black! (color: {} (RGB))".format(
|
|
':'.join([str(x) for x in color.values()]))
|
|
return False, "Image has only one single color! (color: {} (RGB))".format(
|
|
':'.join([str(x) for x in color.values()]))
|
|
except ffmpeg.Error as e:
|
|
msg = "Could not connect to stream URL or other error!"
|
|
try:
|
|
msg += " ({})".format(e.stderr.decode('utf-8').strip().split("\n")[-1])
|
|
except IndexError:
|
|
pass
|
|
if raise_errors:
|
|
raise Exception(msg)
|
|
else:
|
|
return False, msg
|
|
|
|
|
|
# print(check_frame_is_valid('rtsp://172.22.246.207/extron2'))
|
|
|
|
def check_if_audio_is_valid(stream_url, sample_length_sec=3, lower_alert_limit_dBFS=-40.0, raise_errors=True):
|
|
file_name = tempfile.NamedTemporaryFile(suffix='.aac').name
|
|
if os.path.exists(file_name):
|
|
os.remove(file_name)
|
|
try:
|
|
frame, _ = (
|
|
ffmpeg
|
|
.input(stream_url, t=sample_length_sec)
|
|
.output(file_name)
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
|
|
sound = AudioSegment.from_file(file_name, "aac")
|
|
# print(sound.dBFS)
|
|
play(sound)
|
|
if sound.max_dBFS == -float('inf'):
|
|
return False, "No active audio signal detected!"
|
|
elif sound.max_dBFS < lower_alert_limit_dBFS:
|
|
return False, "Very low volume (< {} dBFS) detected! ({})".format(lower_alert_limit_dBFS, sound.max_dBFS)
|
|
|
|
return True, "good audio signal detected! ({} max dBFS in {}s sample)".format(sound.max_dBFS, sample_length_sec)
|
|
except ffmpeg.Error as e:
|
|
msg = "Could not connect to stream URL or other error!"
|
|
try:
|
|
msg += " ({})".format(e.stderr.decode('utf-8').strip().split("\n")[-1])
|
|
except IndexError:
|
|
pass
|
|
if raise_errors:
|
|
raise Exception(msg)
|
|
else:
|
|
return False, msg
|
|
|
|
print(check_if_audio_is_valid('rtsp://172.22.246.207/extron1'))
|
|
|
|
|
|
def check_if_audio_is_valid_stream(stream_url, raise_errors=True):
|
|
audio, _ = (
|
|
ffmpeg
|
|
.input(stream_url, t=2)
|
|
.output('pipe:', f="adts", acodec='copy')
|
|
.run(capture_stdout=False, capture_stderr=False)
|
|
)
|
|
audio = io.BytesIO(audio)
|
|
sound = AudioSegment.from_file(audio, "aac")
|
|
play(sound)
|
|
|
|
# check_if_audio_is_valid('rtsp://172.22.246.207/extron1')
|