From 519dadc3357cb3ea579e2237c786fe135af2f504 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 3 Jun 2025 14:07:05 +0000 Subject: [PATCH] Updates and fixes --- README.rst | 24 +- load_generator/common/dash_emulation.py | 112 +++--- load_generator/common/dash_utils.py | 367 +++++++----------- load_generator/common/hls_emulation.py | 128 +++--- .../common/hls_emulation_with_buffer.py | 89 +++++ load_generator/config/default.py | 170 +++----- load_generator/locustfiles/dash_sequence.py | 53 +-- load_generator/locustfiles/hls_player.py | 69 ++-- .../locustfiles/vod_dash_hls_sequence.py | 71 ++-- requirements.txt | 6 +- 10 files changed, 478 insertions(+), 611 deletions(-) create mode 100644 load_generator/common/hls_emulation_with_buffer.py diff --git a/README.rst b/README.rst index 0690aa4..f7e05c5 100644 --- a/README.rst +++ b/README.rst @@ -67,9 +67,12 @@ Load test example play_mode=full_playback \ bitrate=lowest_bitrate \ locust -f load_generator/locustfiles/vod_dash_hls_sequence.py \ - --no-web -c 1 -r 1 --run-time 10s --only-summary \ - --csv=test-results/output_example - + --headless \ + --users 1 \ + --spawn-rate 1 \ + --run-time 10s \ + --only-summary \ + --csv=test-results/output_example Use the tool through a Docker image @@ -109,9 +112,18 @@ Load test example -e "LOCUST_ONLY_SUMMARY=true" \ -p 8089:8089 \ unifiedstreaming/load-generator:latest \ - --no-web - - + --headless \ + +Changes 2025-06-03 +^^^^^^^^^^^^^^^^^^ +- Update to latest version of Locust (2.37.6) +- Update to newer version of requests +- Updated Python scripts to work with new Locust version +- Added new option "with_buffer" + +A new option is added "with_buffer=true" that can be used in combination with "mode=vod". +It will load 3 segments first; then wait the time of one segment-length between loading +each next segment. diff --git a/load_generator/common/dash_emulation.py b/load_generator/common/dash_emulation.py index bc63975..7d27b8b 100644 --- a/load_generator/common/dash_emulation.py +++ b/load_generator/common/dash_emulation.py @@ -1,112 +1,92 @@ import os import sys import logging -from locust import TaskSequence, seq_task, TaskSet, task +import random + +from locust import SequentialTaskSet, task +from locust.exception import StopUser from mpegdash.parser import MPEGDASHParser from load_generator.common import dash_utils from load_generator.config import default -import random -from locust.exception import StopLocust logger = logging.getLogger(__name__) MANIFEST_FILE = os.getenv('MANIFEST_FILE') -PLAY_MODE = os.getenv("play_mode") -BUFFER_SIZE = os.getenv("buffer_size") -BUFFER_SIZE = int(BUFFER_SIZE) # Cast os.environ str to int -BITRATE = os.getenv("bitrate") - +PLAY_MODE = os.getenv("play_mode", "full_playback") +BUFFER_SIZE = int(os.getenv("buffer_size", "0")) +BITRATE = os.getenv("bitrate", "highest_bitrate") LOGGER = True -class class_dash_player(TaskSet): - """ - Simple MPEG-DASH emulation of a player - Receives an MPEG-DASH /.mpd manifest - """ +class class_dash_player(SequentialTaskSet): base_url = None mpd_body = None mpd_object = None - print("started task") - @seq_task(1) + @task def get_manifest(self): - print("MPEG-DASH child player running ...") - base_url = f"{self.locust.host}/{MANIFEST_FILE}" + logger.info("MPEG-DASH player starting...") + base_url = f"{self.user.host}/{MANIFEST_FILE}" if LOGGER: - print(base_url) - self.base_url = f"{base_url}" # It should already be a /.mpd - logger.info(f"Requesting manifest: {base_url}") - response_mpd = self.client.get(f"{base_url}", name="merged") + print(f"Requesting: {base_url}") + self.base_url = base_url + + response_mpd = self.client.get(base_url, name="merged") self.mpd_body = response_mpd.text - if response_mpd.status_code == 0 or response_mpd.status_code == 404: - logger.info("Make sure your Manifest URI is reachable") - try: - sys.exit(1) - except SystemExit: - os._exit(1) - else: - pass - - @seq_task(2) - def dash_parse(self, reschedule=True): - """ - Parse Manifest file to MPEGDASHParser - """ - logger.info("Obtained MPD body ") - if self.mpd_body is not None: - self.mpd_object = MPEGDASHParser.parse(self.mpd_body) - print(f"self.mpd_object: {self.mpd_object}") - else: - # self.interrupt() - pass - @seq_task(3) + if response_mpd.status_code in [0, 404]: + logger.error("Manifest unreachable") + self.environment.runner.quit() # Stop the test early + raise StopUser() + + @task + def dash_parse(self): + if not self.mpd_body: + logger.warning("No MPD content to parse.") + raise StopUser() + + logger.info("Parsing MPD content...") + self.mpd_object = MPEGDASHParser.parse(self.mpd_body) + + @task def dash_playback(self): - """ - Create a list of the avaialble segment URIs with - its specific media representation - """ - logger.info("Dash playback") + logger.info("Starting DASH playback...") + all_reprs, period_segments = dash_utils.prepare_playlist( self.base_url, self.mpd_object ) - if all_reprs != 0 and period_segments != 0: + + if all_reprs and period_segments: selected_representation = dash_utils.select_representation( - period_segments["abr"], - BITRATE # highest_bitrate, lowest_bitrate, random_bitrate + period_segments["abr"], BITRATE ) + chosen_video = selected_representation[1] chosen_audio = selected_representation[0] + if PLAY_MODE == "full_playback": if BUFFER_SIZE == 0: dash_utils.simple_playback( - self, - period_segments, - chosen_video, - chosen_audio, - False # Delay in between every segment request + self, period_segments, chosen_video, chosen_audio, False ) else: dash_utils.playback_w_buffer( - self, - period_segments, - chosen_video, - chosen_audio, - BUFFER_SIZE + self, period_segments, chosen_video, chosen_audio, BUFFER_SIZE ) elif PLAY_MODE == "only_manifest": self.get_manifest() else: - # select random segments: one for audio content and second for - # video video_timeline = period_segments["repr"][chosen_video]["timeline"] audio_timeline = period_segments["repr"][chosen_audio]["timeline"] + video_segment = random.choice(video_timeline) audio_segment = random.choice(audio_timeline) - logger.info(video_segment["url"]) + + logger.info(f"Fetching video segment: {video_segment['url']}") self.client.get(video_segment["url"]) - logger.info(audio_segment["url"]) + + logger.info(f"Fetching audio segment: {audio_segment['url']}") self.client.get(audio_segment["url"]) else: - print("Peridos not found in the MPD body") + logger.error("No periods or representations found in MPD.") + raise StopUser() diff --git a/load_generator/common/dash_utils.py b/load_generator/common/dash_utils.py index fc83817..6639971 100755 --- a/load_generator/common/dash_utils.py +++ b/load_generator/common/dash_utils.py @@ -2,10 +2,10 @@ import time import logging import random +import gevent logger = logging.getLogger(__name__) - HIGHEST_BITRATE = "highest_bitrate" LOWEST_BITRATE = "lowest_bitrate" @@ -13,141 +13,106 @@ def get_segment_url(ism_url, base_url, media_segment): - return( - f"{ism_url}/{base_url}{media_segment}" - ) + return f"{ism_url}/{base_url}{media_segment}" -def create_segment_timeline( - ism_url, base_url, media_segment, time, segment_duration): +def create_segment_timeline(ism_url, base_url, media_segment, time, segment_duration): url = get_segment_url(ism_url, base_url, media_segment) - segment = {} - segment["time"] = time - segment["url"] = url - segment["duration"] = segment_duration - return segment - - -def create_segment( - media_segment, ism_url, protocol, time, segment_duration, - segment_timeline): - - segment = create_segment_timeline( - ism_url, - protocol, - media_segment, - time, - segment_duration - ) + return { + "time": time, + "url": url, + "duration": segment_duration + } + + +def create_segment(media_segment, ism_url, protocol, time, segment_duration, segment_timeline): + segment = create_segment_timeline(ism_url, protocol, media_segment, time, segment_duration) segment_timeline.append(segment) - time = time + segment_duration + time += segment_duration return time, segment_timeline -def create_segments_timeline( - ism_url, protocol, media, representation, timeline): - """ - host: string - ism_file: string - base_url: string - representation: string - timeline: SegmentTimeline object from MPEGDASHParser - """ +def create_segments_timeline(ism_url, protocol, media, representation, timeline): segment_timeline = [] time = 0 media_repr = media.replace("$RepresentationID$", representation) + for segment in timeline.Ss: segment_duration = segment.d if segment.t is not None and segment.r is not None: time = segment.t - media_segment = media_repr.replace("$Time$", str(time)) - r = segment.r + 1 - for i in range(0, r): - time, segment_timeline = create_segment( - media_segment, ism_url, - protocol, - time, - segment_duration, - segment_timeline - ) + for _ in range(segment.r + 1): media_segment = media_repr.replace("$Time$", str(time)) + time, segment_timeline = create_segment(media_segment, ism_url, protocol, time, segment_duration, segment_timeline) elif segment.t is None and segment.r is None: media_segment = media_repr.replace("$Time$", str(time)) - time, segment_timeline = create_segment( - media_segment, ism_url, - protocol, - time, - segment_duration, - segment_timeline - ) + time, segment_timeline = create_segment(media_segment, ism_url, protocol, time, segment_duration, segment_timeline) elif segment.t is not None and segment.r is None: time = segment.t media_segment = media_repr.replace("$Time$", str(time)) - time, segment_timeline = create_segment( - media_segment, ism_url, - protocol, - time, - segment_duration, - segment_timeline - ) + time, segment_timeline = create_segment(media_segment, ism_url, protocol, time, segment_duration, segment_timeline) else: - r = segment.r + 1 - for i in range(0, r): + for _ in range(segment.r + 1): media_segment = media_repr.replace("$Time$", str(time)) - time, segment_timeline = create_segment( - media_segment, ism_url, - protocol, - time, - segment_duration, - segment_timeline - ) + time, segment_timeline = create_segment(media_segment, ism_url, protocol, time, segment_duration, segment_timeline) + return segment_timeline def prepare_playlist(ism_url, mpd): - """ - mpd: MPEGDASHParser object - Returns - all_reprs: all available representation from MPD manifest - period_s: dict of timelines for each representation and dict of bitrates - available - """ logger.info("Preparing timeline...") all_reprs = [] period_s = {} - dir_mpd = dir(mpd) - if "periods" in dir_mpd: + + if hasattr(mpd, "periods"): for period in mpd.periods: - base_urls = (period.base_urls) - protocol = base_urls[0].base_url_value # only dash/ is in index 0 + base_urls = period.base_urls + + if not base_urls: + logger.warning("No BaseURL found in period. Using default '/' as fallback.") + protocol = "" + else: + protocol = base_urls[0].base_url_value + period_s["repr"] = {} period_s["abr"] = {} for adapt_set in period.adaptation_sets: - timeline = [] - content_type = adapt_set.content_type - period_s["abr"][content_type] = {} - period_s["abr"][content_type]["representation"] = [] - period_s["abr"][content_type]["bandwidth"] = [] + segment_template = adapt_set.segment_templates[0] + + if not segment_template.segment_timelines: + logger.error("SegmentTimeline is missing. This MPD is not supported by current playback logic.") + return 0, 0 + + timeline = segment_template.segment_timelines[0] + media = adapt_set.segment_templates[0].media - timeline = adapt_set.segment_templates[0].segment_timelines[0] timescale = adapt_set.segment_templates[0].timescale + content_type = adapt_set.content_type + + period_s["abr"].setdefault(content_type, { + "representation": [], + "bandwidth": [] + }) + for repr in adapt_set.representations: all_reprs.append(repr) - respresentation = repr.id + representation_id = repr.id bandwidth = repr.bandwidth - content_type = adapt_set.content_type + segments_urls = create_segments_timeline( - ism_url, protocol, media, respresentation, timeline + ism_url, protocol, media, representation_id, timeline ) number_segments = len(segments_urls) - period_s["repr"][respresentation] = {} - period_s["repr"][respresentation]["timeline"] = segments_urls - period_s["repr"][respresentation]["bandwidth"] = bandwidth - period_s["repr"][respresentation]["contentType"] = content_type - period_s["repr"][respresentation]["timescale"] = timescale - period_s["repr"][respresentation]["size"] = number_segments - period_s["abr"][content_type]["representation"].append(respresentation) + + period_s["repr"][representation_id] = { + "timeline": segments_urls, + "bandwidth": bandwidth, + "contentType": content_type, + "timescale": timescale, + "size": number_segments + } + period_s["abr"][content_type]["representation"].append(representation_id) period_s["abr"][content_type]["bandwidth"].append(bandwidth) return all_reprs, period_s @@ -158,198 +123,146 @@ def prepare_playlist(ism_url, mpd): def get_segment_duration(period_segments, chosen_quality, index): duration = period_segments["repr"][chosen_quality]["timeline"][index]["duration"] timescale = period_segments["repr"][chosen_quality]["timescale"] - segment_duration = duration/timescale - return segment_duration + return duration / timescale def simple_playback(self, period_segments, chosen_video, chosen_audio, delay): number_video_segments = period_segments["repr"][chosen_video]["size"] - for i in range(0, number_video_segments - 1): + for i in range(number_video_segments - 1): video_segment = period_segments["repr"][chosen_video]["timeline"][i]["url"] self.client.get(video_segment, name="merged") if LOGGER_SEGMENTS: print(video_segment) - segment_duration = get_segment_duration( - period_segments, chosen_video, i - ) + + segment_duration = get_segment_duration(period_segments, chosen_video, i) if delay: logger.info(f"Sleeping client for: {segment_duration} seconds") - self._sleep(segment_duration) - else: - pass - for j in range(0, 2): + gevent.sleep(segment_duration) + + for j in range(2): audio_segment = period_segments["repr"][chosen_audio]["timeline"][i + j]["url"] self.client.get(audio_segment, name="merged") if LOGGER_SEGMENTS: print(audio_segment) - segment_duration = get_segment_duration( - period_segments, chosen_video, i - ) - logger.info("******* Finished playing the whole timeline *******") - def simple_live_playback(self, period_segments, chosen_video, chosen_audio, delay): number_video_segments = period_segments["repr"][chosen_video]["size"] - for i in range(0, int(number_video_segments/2) - 1): + for i in range(int(number_video_segments / 2) - 1): video_segment = period_segments["repr"][chosen_video]["timeline"][i]["url"] response = self.client.get(video_segment) period_segments["repr"][chosen_video]["timeline"].pop(i) + if LOGGER_SEGMENTS: - print(video_segment) - print(response.status_code) - segment_duration = get_segment_duration( - period_segments, chosen_video, i - ) + print(video_segment, response.status_code) + + segment_duration = get_segment_duration(period_segments, chosen_video, i) if delay: logger.info(f"Sleeping client for: {segment_duration} seconds") - self._sleep(segment_duration) - else: - pass - for j in range(0, 2): + gevent.sleep(segment_duration) + + for j in range(2): audio_segment = period_segments["repr"][chosen_audio]["timeline"][i + j]["url"] self.client.get(audio_segment) if LOGGER_SEGMENTS: print(audio_segment) - segment_duration = get_segment_duration( - period_segments, chosen_video, i - ) - return period_segments + logger.info("******* Finished playing the whole timeline *******") + return period_segments def simple_buffer(self, segment_count, buffer_size, segment_duration): - min_buffer = buffer_size/2 + min_buffer = buffer_size / 2 if segment_count >= min_buffer: - # wait for the last segment duration - logger.info(f"Buffering for {segment_duration} seconds ") - time.sleep(segment_duration) - self._sleep(segment_duration) + logger.info(f"Buffering for {segment_duration} seconds") + gevent.sleep(segment_duration) segment_count = 0 return segment_count def get_channel_rate(http_response): - """ - Calculate channel_rate based on HTTP request - http_response: requests.response object - return: - chnnel_rate [kbps] - download_duration of segment: [seconds] - """ - channel_rate = None - download_duration = None - content_length = None - code = http_response.status_code - if code == 200: - content_length = http_response.headers['Content-Length'] - content_length = int(content_length) - elapsed = http_response.elapsed - if elapsed.seconds is not None: - microseconds = elapsed.microseconds - seconds = elapsed.seconds - microseconds = microseconds / 1000000 # microseconds to seconds - download_duration = microseconds + seconds # [seconds] - content_length = content_length * 8 # KB to kilobits - channel_rate = content_length / (download_duration * 1000) # [kbps] + channel_rate = 0 + download_duration = 0 + content_length = http_response.headers.get('Content-Length') + + if http_response.status_code == 200 and content_length: + try: + content_length = int(content_length) + elapsed = http_response.elapsed + seconds = elapsed.seconds + (elapsed.microseconds / 1_000_000) + download_duration = seconds + channel_rate = (content_length * 8) / (download_duration * 1000) + except Exception as e: + logger.warning(f"Failed to calculate channel rate: {e}") else: - logger.error(f"Error request with code: {code} ") - channel_rate = 0 - content_length = None - download_duration = None + logger.error(f"Error request with code: {http_response.status_code}") return channel_rate, download_duration -def buffer_model( - self, buffer_level, segment_duration, download_duration, max_buffer): - """ - self: locust TaskSequence object - buffer_level at epoch t: float [seconds] - segment_duration: float [seconds] - download_duration of segment t [seconds] - max_buffer: int [seconds] - Returns: - updated buffer_level - """ +def buffer_model(self, buffer_level, segment_duration, download_duration, max_buffer): logger.info(f"Buffer level: {buffer_level} seconds") - delta_t = buffer_level - download_duration + segment_duration - max_buffer - delta_t = abs(delta_t) + delta_t = abs(buffer_level - download_duration + segment_duration - max_buffer) diff = buffer_level - download_duration + segment_duration - # Update buffer level - buffer_level = buffer_level + segment_duration - download_duration - if (diff < max_buffer): - # Request (t + 1)-th segment + buffer_level += segment_duration - download_duration + + if diff < max_buffer: return buffer_level else: - buffer_level -= delta_t # Creates playback + buffer_level -= delta_t logger.info(f"Buffering for {delta_t} seconds") - self._sleep(delta_t) - time.sleep(delta_t) # Wait before (t + 1)-th segment is requested + gevent.sleep(delta_t) return buffer_level -def playback_w_buffer( - self, period_segments, chosen_video, chosen_audio, max_buffer=10): - """ - Apply buffer by max_buffer parameter - """ - if isinstance(max_buffer, int): - if LOGGER_SEGMENTS: - logger.info(f"Buffer size: {max_buffer}") - segment_count = 1 # empty buffer initialized - buffer_level = 0 # buffer starts empty - number_video_segments = period_segments["repr"][chosen_video]["size"] - segments = period_segments["repr"] - - for i in range(0, number_video_segments - 1): - video_segment = segments[chosen_video]["timeline"][i]["url"] - logger.info(video_segment) - response = self.client.get(video_segment, name="merged") - channel_rate, download_duration = get_channel_rate(response) - segment_duration = get_segment_duration( - period_segments, chosen_video, i - ) - if LOGGER_SEGMENTS: - logger.info(f"Video segment duration: {segment_duration} seconds") - buffer_level = buffer_model( - self, - buffer_level, segment_duration, download_duration, max_buffer - ) - segment_count += i - if LOGGER_SEGMENTS: - logger.info(f"Number of segments in buffer: {segment_count}") - for j in range(0, 2): - audio_segment = segments[chosen_audio]["timeline"][i+j]["url"] - logger.info(audio_segment) - self.client.get(audio_segment, name="merged") - segment_duration = get_segment_duration( - period_segments, chosen_audio, i+j - ) - if LOGGER_SEGMENTS: - logger.info( - f"Audio segment duration : {segment_duration} seconds" - ) - - else: +def playback_w_buffer(self, period_segments, chosen_video, chosen_audio, max_buffer=10): + if not isinstance(max_buffer, int): logger.error("Your buffer size needs to be an integer") return + if LOGGER_SEGMENTS: + logger.info(f"Buffer size: {max_buffer}") + + segment_count = 1 + buffer_level = 0 + number_video_segments = period_segments["repr"][chosen_video]["size"] + segments = period_segments["repr"] + + for i in range(number_video_segments - 1): + video_segment = segments[chosen_video]["timeline"][i]["url"] + logger.info(video_segment) + response = self.client.get(video_segment, name="merged") + + channel_rate, download_duration = get_channel_rate(response) + segment_duration = get_segment_duration(period_segments, chosen_video, i) + + if LOGGER_SEGMENTS: + logger.info(f"Video segment duration: {segment_duration} seconds") + + buffer_level = buffer_model(self, buffer_level, segment_duration, download_duration, max_buffer) + segment_count += i + + if LOGGER_SEGMENTS: + logger.info(f"Number of segments in buffer: {segment_count}") + + for j in range(2): + audio_segment = segments[chosen_audio]["timeline"][i + j]["url"] + logger.info(audio_segment) + self.client.get(audio_segment, name="merged") + segment_duration = get_segment_duration(period_segments, chosen_audio, i + j) + + if LOGGER_SEGMENTS: + logger.info(f"Audio segment duration: {segment_duration} seconds") + def select_representation(abr, option): - """ - Select AdaptationSet with minimum or maximum bitrate - abr: dictionary with represenation[] and bandwidths[] - option: int 0-> lowest bitrate, 1-> highest bitrate - """ selected_audio = None selected_video = None - slected_type = ["audio", "video"] selected_representation = [] - for type_content, content in abr.items(): - if type_content in slected_type: + for type_content, content in abr.items(): + if type_content in ["audio", "video"]: if option == HIGHEST_BITRATE: bitrate = max(content["bandwidth"]) elif option == LOWEST_BITRATE: @@ -359,12 +272,12 @@ def select_representation(abr, option): index = content["bandwidth"].index(bitrate) representation = content["representation"][index] + if type_content == "video": selected_video = representation selected_representation.append(selected_video) elif type_content == "audio": selected_audio = representation selected_representation.append(selected_audio) - else: - pass + return selected_representation diff --git a/load_generator/common/hls_emulation.py b/load_generator/common/hls_emulation.py index 8b86951..6bea300 100644 --- a/load_generator/common/hls_emulation.py +++ b/load_generator/common/hls_emulation.py @@ -1,99 +1,73 @@ import os import m3u8 -from locust import TaskSet, task -from load_generator.config import default import logging import random +import gevent +from urllib.parse import urljoin, urlparse -logger = logging.getLogger(__name__) +from locust import TaskSet, task +from load_generator.config import default +logger = logging.getLogger(__name__) MANIFEST_FILE = os.getenv('MANIFEST_FILE') -PLAY_MODE = os.getenv("play_mode") -BUFFER_SIZE = os.getenv("buffer_size") -BUFFER_SIZE = int(BUFFER_SIZE) # Cast os.environ str to int -BITRATE = os.getenv("bitrate") - +PLAY_MODE = os.getenv("play_mode", "full_playback") +BUFFER_SIZE = int(os.getenv("buffer_size", "0")) +BITRATE = os.getenv("bitrate", "highest_bitrate") LOGGER_SEGMENTS = True class class_hls_player(TaskSet): - """ - Simple HLS emulation of a player - Receives an M3U8 manifest (/.m3u8) - """ @task(1) def hls_player_child(self): print("HLS child player running ...") - base_url = (f"{self.locust.host}/{MANIFEST_FILE}") + master_url = f"{self.user.host}{MANIFEST_FILE}" # Avoid extra slash - # get master HLS manifest - master_url = f"{base_url}" # It must be a /.m3u8 Master playlist if LOGGER_SEGMENTS: - print(master_url) - - if PLAY_MODE == "only_manifest": - master_m3u8 = self.client.get(master_url, name="merged") - m3u8.M3U8(content=master_m3u8.text, base_uri=base_url) - - elif PLAY_MODE == "full_playback": - # Retrieve segments with an specific buffer size - master_m3u8 = self.client.get(master_url, name="merged") - parsed_master_m3u8 = m3u8.M3U8(content=master_m3u8.text, base_uri=base_url) - - variant = self.select_bitrate(parsed_master_m3u8) - - variant_url = "{base_url}/{variant}".format(base_url=base_url, variant=variant.uri) - variant_m3u8 = self.client.get(variant_url, name="merged") - parsed_variant_m3u8 = m3u8.M3U8(content=variant_m3u8.text, base_uri=base_url) - - # get all the segments - for segment in parsed_variant_m3u8.segments: - if LOGGER_SEGMENTS: - print(segment.absolute_uri) - self.client.get(segment.absolute_uri, name="merged") - if BUFFER_SIZE != 0: - self._sleep(BUFFER_SIZE) - else: - # Select random segments - master_m3u8 = self.client.get(master_url, name="merged") - parsed_master_m3u8 = m3u8.M3U8(content=master_m3u8.text, base_uri=base_url) - - variant = self.select_bitrate(parsed_master_m3u8) - - variant_url = "{base_url}/{variant}".format(base_url=base_url, variant=variant.uri) - variant_m3u8 = self.client.get(variant_url, name="merged") - parsed_variant_m3u8 = m3u8.M3U8(content=variant_m3u8.text, base_uri=base_url) - - # get random segments - for segment in parsed_variant_m3u8.segments: - segment = random.choice(parsed_variant_m3u8.segments) - if LOGGER_SEGMENTS: - print(segment.absolute_uri) - self.client.get(segment.absolute_uri, name="merged") - if BUFFER_SIZE != 0 and isinstance(BUFFER_SIZE, int): - self._sleep(BUFFER_SIZE) - - def select_bitrate(self, parsed_master_m3u8): - bandwidth_list = [] - for playlist in parsed_master_m3u8.playlists: - bandwidth = playlist.stream_info.bandwidth - bandwidth_list.append(bandwidth) + print(f"Master URL: {master_url}") + + # Load master playlist + master_response = self.client.get(master_url, name="master") + parsed_master = m3u8.M3U8(content=master_response.text, base_uri=master_url) + + variant = self.select_bitrate(parsed_master) + variant_url = urljoin(master_url, variant.uri) + + try: + variant_response = self.client.get(variant_url, name=variant.uri) + except Exception as e: + logger.error(f"Failed variant: {variant.uri} -> {e}") + return + + variant_base_uri = variant_url.rsplit("/", 1)[0] + "/" + parsed_variant = m3u8.M3U8(content=variant_response.text, base_uri=variant_base_uri) + + segments = parsed_variant.segments + if PLAY_MODE == "random_segments": + segments = [random.choice(segments) for _ in range(len(segments))] + + for segment in segments: + segment_url = segment.absolute_uri + if LOGGER_SEGMENTS: + print(segment_url) + + try: + self.client.get(segment_url, name=segment.uri) + except Exception as e: + logger.error(f"Failed segment: {segment_url} -> {e}") + + if BUFFER_SIZE: + gevent.sleep(BUFFER_SIZE) + + def select_bitrate(self, parsed_master): + bandwidths = [p.stream_info.bandwidth for p in parsed_master.playlists] if BITRATE == "highest_bitrate": - max_bandwidth = bandwidth_list.index(max(bandwidth_list)) - variant = parsed_master_m3u8.playlists[max_bandwidth] + max_index = bandwidths.index(max(bandwidths)) + return parsed_master.playlists[max_index] elif BITRATE == "lowest_bitrate": - min_bandwidth = bandwidth_list.index(min(bandwidth_list)) - variant = parsed_master_m3u8.playlists[min_bandwidth] + min_index = bandwidths.index(min(bandwidths)) + return parsed_master.playlists[min_index] else: - # Select a random bitrate - variant = random.choice(parsed_master_m3u8.playlists) - - return variant - - def simple_buffer(self, segment): - seg_get = self.client.get(segment.absolute_uri, name="merged") - sleep = segment.duration - seg_get.elapsed.total_seconds() - self._sleep(sleep) + return random.choice(parsed_master.playlists) diff --git a/load_generator/common/hls_emulation_with_buffer.py b/load_generator/common/hls_emulation_with_buffer.py new file mode 100644 index 0000000..891fe39 --- /dev/null +++ b/load_generator/common/hls_emulation_with_buffer.py @@ -0,0 +1,89 @@ +import os +import m3u8 +import logging +import random +import gevent +from urllib.parse import urljoin + +from locust import TaskSet, task +from load_generator.config import default + +logger = logging.getLogger(__name__) + +# Environment Variables +MANIFEST_FILE = os.getenv("MANIFEST_FILE") +PLAY_MODE = os.getenv("play_mode", "full_playback") +BUFFER_SIZE = int(os.getenv("buffer_size", "30")) # target buffer in seconds +BITRATE = os.getenv("bitrate", "highest_bitrate") + +LOGGER_SEGMENTS = True + + +class class_hls_player_with_buffer(TaskSet): + @task(1) + def hls_realistic_playback(self): + print("HLS player starting realistic playback...") + master_url = f"{self.user.host}{MANIFEST_FILE}" # Avoid extra slash + + if LOGGER_SEGMENTS: + print(f"Master URL: {master_url}") + + # Load master playlist + master_response = self.client.get(master_url, name="master") + parsed_master = m3u8.M3U8(content=master_response.text, base_uri=master_url) + + # Choose bitrate + variant = self.select_bitrate(parsed_master) + variant_url = urljoin(master_url, variant.uri) + + try: + variant_response = self.client.get(variant_url, name=variant.uri) + except Exception as e: + logger.error(f"Failed variant: {variant.uri} -> {e}") + return + + parsed_variant = m3u8.M3U8(content=variant_response.text, base_uri=variant_url) + segments = parsed_variant.segments + + # Use actual durations if possible + segment_duration = segments[0].duration if segments else 10 # fallback + + # Pre-buffering phase + buffer_sec = 0 + i = 0 + while buffer_sec < BUFFER_SIZE and i < len(segments): + segment = segments[i] + try: + self.client.get(segment.absolute_uri, name=segment.uri) + if LOGGER_SEGMENTS: + print(f"Buffered segment: {segment.absolute_uri}") + buffer_sec += segment.duration + except Exception as e: + logger.error(f"Failed segment: {segment.absolute_uri} -> {e}") + i += 1 + + # Playback phase + for j in range(i, len(segments)): + gevent.sleep(segment_duration) # simulate playing segment_duration + buffer_sec -= segment_duration + + segment = segments[j] + try: + self.client.get(segment.absolute_uri, name=segment.uri) + if LOGGER_SEGMENTS: + print(f"Fetched segment: {segment.absolute_uri}") + buffer_sec += segment.duration + except Exception as e: + logger.error(f"Failed segment: {segment.absolute_uri} -> {e}") + + def select_bitrate(self, parsed_master): + bandwidths = [p.stream_info.bandwidth for p in parsed_master.playlists] + + if BITRATE == "highest_bitrate": + max_index = bandwidths.index(max(bandwidths)) + return parsed_master.playlists[max_index] + elif BITRATE == "lowest_bitrate": + min_index = bandwidths.index(min(bandwidths)) + return parsed_master.playlists[min_index] + else: + return random.choice(parsed_master.playlists) diff --git a/load_generator/config/default.py b/load_generator/config/default.py index 88d3200..18e36b2 100644 --- a/load_generator/config/default.py +++ b/load_generator/config/default.py @@ -1,126 +1,56 @@ import os -import sys import logging logger = logging.getLogger(__name__) -MANIFEST_FILE = None - -if "mode" in os.environ: - if os.environ["mode"] not in ["vod", "live"]: - logger.error("That is an incorrect input variable for 'mode'") - try: - sys.exit(0) - except SystemExit: - os._exit(0) - else: - mode = os.environ.get("mode") - print(f"The selected 'mode' variable is: {mode}") - -else: - print("You should specify the mode: 'vod' or 'live'") - try: - sys.exit(1) - except SystemExit: - os._exit(1) - -if "play_mode" in os.environ: - if os.environ["play_mode"] not in ["only_manifest", "full_playback", "random_segments"]: - print( - "You should specify a correct variable for 'play_mode' ENV" - " variable: 'only_manifest', 'full_playback', 'random_segments'" - ) - try: - sys.exit(0) - except SystemExit: - os._exit(0) - else: - play_mode = os.environ.get("play_mode") - print(f"The selected 'play_mode' variable is: {play_mode}") - -else: - # Default behaviour if play_mode is not set - os.environ["play_mode"] = "full_playback" - print("Default play_mode is set to: 'full_playback'") - -if "bitrate" in os.environ: - if os.environ["bitrate"] not in ["highest_bitrate", "lowest_bitrate", "random_bitrate"]: - print( - "You should specify a correct variable for 'bitrate' ENV" - " variable: highest_birtate, lowest_bitrate, random_bitrate" - ) - try: - sys.exit(0) - except SystemExit: - os._exit(0) - else: - bitrate = os.environ.get("bitrate") - print(f"The selected 'bitrate' variable is: {bitrate}") - -else: - # Default behaviour if bitrate is not set - os.environ["bitrate"] = "highest_bitrate" - print( - "'bitrate' ENV variable is not set. Default 'bitrate' is set to: " - "'highest_bitrate'" - ) - -# Create list of possible buffer_size -list_input = str(list(range(0, 10))) -if "buffer_size" in os.environ: - if os.environ["buffer_size"] not in list_input: - print( - "You should specify a correct variable for 'buffer_size' ENV" - " variable: integers from 0 to 10" - ) - try: - sys.exit(1) - except SystemExit: - os._exit(1) - else: - buffer_size = os.environ.get("buffer_size") - print(f"The selected 'buffer_size' variable is: {buffer_size}") -else: - os.environ["buffer_size"] = "0" - print( - "'buffer_size' ENV variable is not set. Default 'buffer_size'" - "is set to: 0" - ) - -if ("time_shift" in os.environ) and (os.environ.get("mode") == "live"): - if os.environ["time_shift"] not in ["-4", "-3", "-2", "-1", "0", "1"]: - print( - "You should specify a correct variable for 'time_shift' ENV" - " int variable: -4, -3, 2, 1, 0, 1" - ) - try: - sys.exit(1) - except SystemExit: - os._exit(1) - else: - mode = os.environ.get("mode") - print(f"Mode before starting time_shift is : {mode}") +def validate_env(): + mode = os.environ.get("mode") + if mode not in ["vod", "live"]: + raise ValueError("You must set ENV 'mode' to 'vod' or 'live'") + print(f"The selected 'mode' variable is: {mode}") + + play_mode = os.environ.get("play_mode", "full_playback") + if play_mode not in ["only_manifest", "full_playback", "random_segments"]: + raise ValueError("Invalid 'play_mode'. Use: only_manifest, full_playback, or random_segments") + os.environ["play_mode"] = play_mode + print(f"The selected 'play_mode' variable is: {play_mode}") + + bitrate = os.environ.get("bitrate", "highest_bitrate") + if bitrate not in ["highest_bitrate", "lowest_bitrate", "random_bitrate"]: + raise ValueError("Invalid 'bitrate'. Use: highest_bitrate, lowest_bitrate, or random_bitrate") + os.environ["bitrate"] = bitrate + print(f"The selected 'bitrate' variable is: {bitrate}") + + buffer_size = os.environ.get("buffer_size", "0") + if buffer_size not in map(str, range(0, 11)): + raise ValueError("Invalid 'buffer_size'. Use an integer from 0 to 10") + os.environ["buffer_size"] = buffer_size + print(f"The selected 'buffer_size' variable is: {buffer_size}") + + if mode == "live": time_shift = os.environ.get("time_shift") - print(f"The selected 'time_shift' variable is: {time_shift} with type: {type(time_shift)}") - -elif ("time_shift" in os.environ) and (os.environ.get("mode") == "vod"): - print( - "'time_shift' ENV can only be used with mode=live" - ) - try: - sys.exit(1) - except SystemExit: - os._exit(1) -else: - print("'time_shift' ENV variable is not set") - - -if "MANIFEST_FILE" not in os.environ: - print("You are required to set MANIFEST_FILE ENV variable ") - try: - sys.exit(1) - except SystemExit: - os._exit(1) -else: - MANIFEST_FILE = os.getenv('MANIFEST_FILE') - print(f"**** The manifest file is: {MANIFEST_FILE} ****") + if time_shift and time_shift not in ["-4", "-3", "-2", "-1", "0", "1"]: + raise ValueError("Invalid 'time_shift'. Use values: -4 to 1") + elif time_shift: + print(f"The selected 'time_shift' variable is: {time_shift} (type: {type(time_shift)})") + elif "time_shift" in os.environ: + raise ValueError("'time_shift' can only be used with mode=live") + + manifest_file = os.environ.get("MANIFEST_FILE") + if not manifest_file: + raise ValueError("You must set ENV variable 'MANIFEST_FILE'") + print(f"**** The manifest file is: {manifest_file} ****") + + return { + "mode": mode, + "play_mode": play_mode, + "bitrate": bitrate, + "buffer_size": int(buffer_size), + "manifest_file": manifest_file, + "time_shift": os.environ.get("time_shift"), + } + + +# Validate on import (optional — Locust files should import only if desired) +env_config = validate_env() +MANIFEST_FILE = env_config["manifest_file"] diff --git a/load_generator/locustfiles/dash_sequence.py b/load_generator/locustfiles/dash_sequence.py index 3ba8f84..2742a63 100755 --- a/load_generator/locustfiles/dash_sequence.py +++ b/load_generator/locustfiles/dash_sequence.py @@ -1,65 +1,53 @@ import os import sys -from locust import HttpLocust, between, seq_task, TaskSequence +import logging +import resource +from locust import HttpUser, SequentialTaskSet, task, between + from mpegdash.parser import MPEGDASHParser from load_generator.common import dash_utils from load_generator.config import default # ENV configuration -import logging if sys.version_info[0] < 3: raise Exception("Must be using Python 3") logger = logging.getLogger(__name__) print(resource.getrlimit(resource.RLIMIT_NOFILE)) -# set the highest limit of open files in the server -resource.setrlimit(resource.RLIMIT_NOFILE, resource.getrlimit( - resource.RLIMIT_NOFILE) -) +resource.setrlimit(resource.RLIMIT_NOFILE, resource.getrlimit(resource.RLIMIT_NOFILE)) MANIFEST_FILE = os.getenv('MANIFEST_FILE') -class UserBehaviour(TaskSequence): - """ - Example task sequences with global values - """ +class UserBehaviour(SequentialTaskSet): base_url = None mpd_body = None mpd_object = None - @seq_task(1) + @task def get_manifest(self): """ Retrieve the MPD manifest file """ - # mode = os.environ.get("mode") - base_url = f"{self.locust.host}/{MANIFEST_FILE}" - self.base_url = base_url - logger.info(f"Requesting manifest: {base_url}/.mpd") - response_mpd = self.client.get(f"{base_url}/.mpd") + self.base_url = f"{self.user.host}/{MANIFEST_FILE}" + logger.info(f"Requesting manifest: {self.base_url}/.mpd") + response_mpd = self.client.get(f"{self.base_url}/.mpd") self.mpd_body = response_mpd.text - # Exit the program if the Manifest file is not reachable + if response_mpd.status_code == 0: - logger.error( - f"Make sure your Manifest URI is reachable: {base_url}" - ) - try: - sys.exit(1) - except SystemExit: - os._exit(1) + logger.error(f"Manifest URI not reachable: {self.base_url}") + sys.exit(1) - @seq_task(2) + @task def dash_parse(self): """ Parse Manifest file to MPEGDASHParser """ self.mpd_object = MPEGDASHParser.parse(self.mpd_body) - @seq_task(3) + @task def dash_playback(self): """ - Create a list of the avaialble segment URIs with - its specific media representation + Generate segment URIs and simulate playback """ all_reprs, period_segments = dash_utils.prepare_playlist( self.base_url, self.mpd_object @@ -67,9 +55,10 @@ def dash_playback(self): bitrate = os.environ.get("bitrate") selected_representation = dash_utils.select_representation( period_segments["abr"], - bitrate # highest_bitrate, lowest_bitrate, random_bitrate + bitrate # Options: highest_bitrate, lowest_bitrate, random_bitrate ) - buffer_size = int(os.environ.get("buffer_size")) + buffer_size = int(os.environ.get("buffer_size", 0)) + if buffer_size == 0: dash_utils.simple_playback( self, @@ -88,7 +77,7 @@ def dash_playback(self): ) -class MyLocust(HttpLocust): +class WebsiteUser(HttpUser): host = os.getenv('HOST_URL', "http://localhost") - task_set = UserBehaviour wait_time = between(0, 0) + tasks = [UserBehaviour] diff --git a/load_generator/locustfiles/hls_player.py b/load_generator/locustfiles/hls_player.py index ac1d14b..bbcdfb9 100644 --- a/load_generator/locustfiles/hls_player.py +++ b/load_generator/locustfiles/hls_player.py @@ -1,73 +1,60 @@ -################################################## -# Simple emulator of an HLS media player -################################################## -# MIT License -################################################## -# Author: Mark Ogle -# License: MIT -# Email: mark@unified-streaming.com -# Maintainer: roberto@unified-streaming.com -################################################## - import os -from locust import HttpLocust, TaskSet, task, between -import m3u8 +import sys import logging import resource -import sys import random +import m3u8 +import gevent +from locust import HttpUser, TaskSet, task, between if sys.version_info[0] < 3: raise Exception("Must be using Python 3") logger = logging.getLogger(__name__) print(resource.getrlimit(resource.RLIMIT_NOFILE)) -# set the highest limit of open files in the server -resource.setrlimit(resource.RLIMIT_NOFILE, resource.getrlimit( - resource.RLIMIT_NOFILE) -) +resource.setrlimit(resource.RLIMIT_NOFILE, resource.getrlimit(resource.RLIMIT_NOFILE)) MANIFEST_FILE = os.getenv('MANIFEST_FILE') class PlayerTaskSet(TaskSet): - @task(1) + @task def play_stream(self): """ - Play complete stream. - Steps: - * get manifest - * select highest bitrate - * get each segment in order - * wait for segment duration in between downloads, to act somewhat like - a player kinda dumb hack to make results gathering easier is to merge - everything into a single name + Play complete stream: + - Get manifest + - Pick a variant + - Get all segments in sequence + - Wait for segment duration minus download time """ - base_url = (f"{self.locust.host}/{MANIFEST_FILE}") + base_url = f"{self.user.host}/{MANIFEST_FILE}" - # get manifest - # single content + # Fetch master playlist master_url = f"{base_url}/.m3u8" - master_m3u8 = self.client.get(master_url, name="merged") + master_m3u8 = self.client.get(master_url, name=segment.uri) parsed_master_m3u8 = m3u8.M3U8(content=master_m3u8.text, base_uri=base_url) + # Pick a random variant random_variant = random.choice(parsed_master_m3u8.playlists) - variant_url = "{base_url}/{variant}".format(base_url=base_url, variant=random_variant.uri) - variant_m3u8 = self.client.get(variant_url, name="merged") + variant_url = f"{base_url}/{random_variant.uri}" + variant_m3u8 = self.client.get(variant_url, name=segment.uri) parsed_variant_m3u8 = m3u8.M3U8(content=variant_m3u8.text, base_uri=base_url) - # get all the segments + # Fetch each segment for segment in parsed_variant_m3u8.segments: - logger.debug("Getting segment {0}".format(segment.absolute_uri)) + logger.debug(f"Getting segment {segment.absolute_uri}") seg_get = self.client.get(segment.absolute_uri) - sleep = segment.duration - seg_get.elapsed.total_seconds() - logger.debug("Request took {elapsed} and segment duration is {duration}. Sleeping for {sleep}".format( - elapsed=seg_get.elapsed.total_seconds(), duration=segment.duration, sleep=sleep)) - self._sleep(sleep) + sleep_duration = segment.duration - seg_get.elapsed.total_seconds() + logger.debug( + f"Request took {seg_get.elapsed.total_seconds()}s, " + f"segment duration {segment.duration}s, sleeping {sleep_duration}s" + ) + if sleep_duration > 0: + gevent.sleep(sleep_duration) -class MyLocust(HttpLocust): +class WebsiteUser(HttpUser): host = os.getenv('HOST_URL', "http://localhost") - task_set = PlayerTaskSet wait_time = between(0, 0) + tasks = [PlayerTaskSet] diff --git a/load_generator/locustfiles/vod_dash_hls_sequence.py b/load_generator/locustfiles/vod_dash_hls_sequence.py index 1eeddb1..d8ec621 100755 --- a/load_generator/locustfiles/vod_dash_hls_sequence.py +++ b/load_generator/locustfiles/vod_dash_hls_sequence.py @@ -1,64 +1,59 @@ import sys import os -from locust import HttpLocust, between, TaskSequence -# seq_task -# from load_generator.common.dash_emulation import class_dash_player -from load_generator.common.dash_emulation import class_dash_player -from load_generator.common.hls_emulation import class_hls_player import logging import resource +from locust import HttpUser, SequentialTaskSet, task, between + +from load_generator.common.dash_emulation import class_dash_player +from load_generator.common.hls_emulation import class_hls_player +from load_generator.common.hls_emulation_with_buffer import class_hls_player_with_buffer logger = logging.getLogger(__name__) if sys.version_info[0] < 3: - raise Exception("You must usea version above Python 3") + raise Exception("You must use Python 3 or higher") -logger = logging.getLogger(__name__) +# Print and set open file limits print(resource.getrlimit(resource.RLIMIT_NOFILE)) -# set the highest limit of open files in the server resource.setrlimit(resource.RLIMIT_NOFILE, resource.getrlimit(resource.RLIMIT_NOFILE)) MANIFEST_FILE = os.getenv('MANIFEST_FILE') +USE_HLS_BUFFER= os.getenv('with_buffer', 'false') - -class Client(TaskSequence): - """ - Verifies if it is a MPEG-DASH or HLS manifest - """ +class ClientBehavior(SequentialTaskSet): def on_start(self): - base_url = f"{self.locust.host}/{MANIFEST_FILE}" - self.base_url = base_url - if base_url.endswith(".mpd"): + self.base_url = f"{self.user.host}/{MANIFEST_FILE}" + if self.base_url.endswith(".mpd"): logger.info("It is a MPEG-DASH URI") - self.schedule_task(class_dash_player) # --> load_generator.common.dash_emulation - elif base_url.endswith(".m3u8"): + self.player_class = class_dash_player + elif self.base_url.endswith(".m3u8?DVR"): logger.info("It is a HLS URI") - self.schedule_task(class_hls_player) # -> load_generator.common.hls_emulation + if USE_HLS_BUFFER == 'true': + logging.info("HLS with playback-aware segment fetching") + self.player_class = class_hls_player_with_buffer + else: + self.player_class = class_hls_player else: logger.error( - "The URI provided is not supported for MPEG-DASH or " - "HLS media endpoint. Make sure the MANIFEST_FILE " - "envrionment ends with '.mpd' or '.m3u8'" + "Unsupported manifest URI. Make sure MANIFEST_FILE ends with '.mpd' or '.m3u8'" ) - try: - sys.exit(0) - except SystemExit: - os._exit(0) + sys.exit(1) - # Check if the Manifest is available before proceeding to test - manifest_response = self.client.get(self.base_url, name="merged") + # Check manifest availability + manifest_response = self.client.get(self.base_url, name="manifest_check") if manifest_response.status_code != 200: - logger.error( - f"The Manifest endpoint is not reachable. Verify that " - f" you can reach the Manifest file: {self.base_url}" - ) - try: - sys.exit(1) - except SystemExit: - os._exit(1) + logger.error(f"Manifest unreachable: {self.base_url}") + sys.exit(1) + + @task + def play(self): + # Run the player logic (assumes these are callable classes/functions) + player = self.player_class(self) + player.base_url = self.base_url + player.run() # Assumes your class_dash_player/class_hls_player has a `run()` method -class MyLocust(HttpLocust): +class WebsiteUser(HttpUser): host = os.getenv('HOST_URL', "http://localhost") - task_set = Client wait_time = between(0, 0) + tasks = [ClientBehavior] diff --git a/requirements.txt b/requirements.txt index 8c311ab..bb37f5c 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,6 @@ pytest coverage -locustio==0.14.4 +locust==2.37.6 mpegdash==0.2.0 m3u8==0.3.6 -requests==2.23.0 - --e . +requests>=2.26.0