import io import logging import sys import ffmpeg import os import tempfile from PIL import Image from pydub import AudioSegment from pydub.playback import play logger = logging.getLogger("lrc.tools.recorder_streams_sanity_checks") def is_single_color_image(image): single_color_image = True color = {} count = 0 color_channels = image.split() for c in color_channels: # r, g, b hist = c.histogram() num_of_non_zero_values = len([v for v in hist if v != 0]) logger.debug("color_channel: {}, num_of_non_zero_values (NON-BLACK): {}".format(c, num_of_non_zero_values)) if num_of_non_zero_values > 1: single_color_image = False break else: color[count] = [i for i in enumerate(hist) if i[1] != 0][0][0] count += 1 return single_color_image, color def check_frame_is_valid(stream_url, raise_errors=True): try: frame, _ = ( ffmpeg .input(stream_url) .output('pipe:', vframes=1, format='image2', vcodec='mjpeg') .run(capture_stdout=True, capture_stderr=True) ) image = Image.open(io.BytesIO(frame)) single_color_image, color = is_single_color_image(image) if not single_color_image: image.show() return True, "all good :-)" else: if all(map(lambda x: x == 0, color.values())): return False, "Image is entirely black! (color: {} (RGB))".format( ':'.join([str(x) for x in color.values()])) return False, "Image has only one single color! (color: {} (RGB))".format( ':'.join([str(x) for x in color.values()])) except ffmpeg.Error as e: msg = "Could not connect to stream URL or other error!" logger.error(msg) try: msg += " ({})".format(e.stderr.decode('utf-8').strip().split("\n")[-1]) except IndexError: pass if raise_errors: raise Exception(msg) else: return False, msg def check_if_audio_is_valid(stream_url, sample_length_sec=3, lower_alert_limit_dBFS=-40.0, raise_errors=True): file_name = tempfile.NamedTemporaryFile(suffix='.aac').name if os.path.exists(file_name): os.remove(file_name) try: frame, _ = ( ffmpeg .input(stream_url, t=sample_length_sec) .output(file_name) .run(capture_stdout=True, capture_stderr=True) ) sound = AudioSegment.from_file(file_name, "aac") # print(sound.dBFS) # play(sound) if sound.max_dBFS == -float('inf'): return False, "No active audio signal detected!" elif sound.max_dBFS < lower_alert_limit_dBFS: return False, "Very low volume (< {} dBFS) detected! ({})".format(lower_alert_limit_dBFS, sound.max_dBFS) return True, "good audio signal detected! ({} max dBFS in {}s sample)".format(sound.max_dBFS, sample_length_sec) except ffmpeg.Error as e: msg = "Could not connect to stream URL or other error!" logger.error(msg) try: msg += " ({})".format(e.stderr.decode('utf-8').strip().split("\n")[-1]) except IndexError: pass if raise_errors: raise Exception(msg) else: return False, msg """ Following code is not working correctly - ffmpeg parameters are wrong. """ """ def check_if_audio_is_valid_stream(stream_url, raise_errors=True): audio, _ = ( ffmpeg .input(stream_url, t=2) .output('pipe:', f="adts", acodec='copy') .run(capture_stdout=False, capture_stderr=False) ) audio = io.BytesIO(audio) sound = AudioSegment.from_file(audio, "aac") play(sound) # check_if_audio_is_valid('rtsp://172.22.246.207/extron1') """ if __name__ == '__main__': # print(check_frame_is_valid('rtsp://172.22.246.207/extron2')) print(check_if_audio_is_valid('rtsp://172.22.246.207/extron1'))