From 632ffec040c979e5fc3dead311f5f1c49b05a9df Mon Sep 17 00:00:00 2001 From: Umesh Date: Wed, 6 Sep 2023 19:58:55 -0500 Subject: [PATCH 1/5] WIP-#39: Initial Commit. Add Deepgram node --- chimerapy/pipelines/speech_to_text/__init__.py | 0 .../pipelines/speech_to_text/deepgram_node.py | 17 +++++++++++++++++ 2 files changed, 17 insertions(+) create mode 100644 chimerapy/pipelines/speech_to_text/__init__.py create mode 100644 chimerapy/pipelines/speech_to_text/deepgram_node.py diff --git a/chimerapy/pipelines/speech_to_text/__init__.py b/chimerapy/pipelines/speech_to_text/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chimerapy/pipelines/speech_to_text/deepgram_node.py b/chimerapy/pipelines/speech_to_text/deepgram_node.py new file mode 100644 index 0000000..2db4d44 --- /dev/null +++ b/chimerapy/pipelines/speech_to_text/deepgram_node.py @@ -0,0 +1,17 @@ +from typing import Optional + +from deepgram import Deepgram + +import chimerapy.engine as cpe +from chimerapy.orchestrator import step_node + + +@step_node(name="CPPipelines_DeepgramNode") +class DeepgramNode(cpe.Node): + def __init__(self, api_key: str, name: str = "DeepgramNode"): + super().__init__(name=name) + self.api_key = api_key + self.deepgram_client: Optional[Deepgram] = None + + def setup(self) -> None: + self.deepgram_client = Deepgram(self.api_key) From 8797b95f99895529bb453a1d5dbf7ef390e8e0cd Mon Sep 17 00:00:00 2001 From: Umesh Timalsina Date: Mon, 11 Sep 2023 13:31:32 -0500 Subject: [PATCH 2/5] Add deepgram node for live transcription --- chimerapy/pipelines/__init__.py | 1 + .../pipelines/speech_to_text/deepgram_node.py | 86 ++++++++++++++++++- configs/displays/camera_test.json | 59 +++++++++++++ configs/speech_to_text/stt_deepgram.json | 62 +++++++++++++ pyproject.toml | 3 +- 5 files changed, 208 insertions(+), 3 deletions(-) create mode 100644 configs/displays/camera_test.json create mode 100644 configs/speech_to_text/stt_deepgram.json diff --git a/chimerapy/pipelines/__init__.py b/chimerapy/pipelines/__init__.py index 5147277..c85a860 100644 --- a/chimerapy/pipelines/__init__.py +++ b/chimerapy/pipelines/__init__.py @@ -22,6 +22,7 @@ def register_nodes_metadata(): "chimerapy.pipelines.yolov8.multi_vid_pose:YoloV8Node", "chimerapy.pipelines.yolov8.multi_save:MultiSaveNode", "chimerapy.pipelines.yolov8.display:DisplayNode", + "chimerapy.pipelines.speech_to_text.deepgram_node:DeepgramNode", ], } diff --git a/chimerapy/pipelines/speech_to_text/deepgram_node.py b/chimerapy/pipelines/speech_to_text/deepgram_node.py index 2db4d44..037a7ab 100644 --- a/chimerapy/pipelines/speech_to_text/deepgram_node.py +++ b/chimerapy/pipelines/speech_to_text/deepgram_node.py @@ -1,4 +1,6 @@ -from typing import Optional +import asyncio +import json +from typing import Any, Dict, Optional from deepgram import Deepgram @@ -8,10 +10,90 @@ @step_node(name="CPPipelines_DeepgramNode") class DeepgramNode(cpe.Node): - def __init__(self, api_key: str, name: str = "DeepgramNode"): + """A node which transcribes live audio using Deepgram. + + Parameters + ---------- + api_key : str + The Deepgram API key + name : str, optional (default: "DeepgramNode") + The name of the node + chunk_key : str, optional (default: "audio_chunk") + The key of the audio chunk in the data chunk + deepgram_options : Dict[str, Any], optional (default: None) + Options to pass to the Deepgram client(deepgram.transcription.live) + """ + + def __init__( + self, + api_key: str, + name: str = "DeepgramNode", + chunk_key: str = "audio_chunk", + deepgram_options: Optional[Dict[str, Any]] = None, + ): super().__init__(name=name) self.api_key = api_key self.deepgram_client: Optional[Deepgram] = None + self.transcribers = {} + self.chunk_key = chunk_key + self.deepgram_options = deepgram_options or {} def setup(self) -> None: + """Setup the Deepgram client.""" self.deepgram_client = Deepgram(self.api_key) + + async def step( + self, data_chunks: Dict[str, cpe.DataChunk] + ) -> cpe.DataChunk: + """Transcribe the audio chunks.""" + for name, data_chunk in data_chunks.items(): + await self._create_transcription(name) + + transcriber = self.transcribers[name] + audio_chunk = data_chunk.get(self.chunk_key)["value"] + transcriber.send(audio_chunk) + + async def _create_transcription(self, name) -> None: + """Create a transcription for the given name.""" + if name not in self.transcribers: + try: + self.transcribers[ + name + ] = await self.deepgram_client.transcription.live( + self.deepgram_options + ) + except Exception as e: + self.logger.error( + f"Failed to create transcription for {name}: {e}" + ) + return + + transcriber = self.transcribers[name] + transcriber.registerHandler( + transcriber.event.CLOSE, + lambda c: print(f"Connection closed with code {c}."), + ) + transcriber.registerHandler( + transcriber.event.ERROR, lambda e: print(f"Error: {e}") + ) + transcriber.registerHandler( + transcriber.event.TRANSCRIPT_RECEIVED, + lambda t: self._save_transcript(name, t), + ) + self.logger.info(f"Created transcription for {name}") + + def _save_transcript(self, name, response) -> None: + """Save the transcript to a csv file.""" + transcript_data = { + "transcript": response["channel"]["alternatives"][0]["transcript"], + "conf": response["channel"]["alternatives"][0]["confidence"], + "start": response["start"], + "end": response["start"] + response["duration"], + "deepgram_json": json.dumps(response, indent=0), + } + self.save_tabular(name, transcript_data) + + def teardown(self) -> None: + """Finish all transcriptions.""" + for transcriber in self.transcribers.values(): + asyncio.create_task(transcriber.finish()) diff --git a/configs/displays/camera_test.json b/configs/displays/camera_test.json new file mode 100644 index 0000000..e0bae9a --- /dev/null +++ b/configs/displays/camera_test.json @@ -0,0 +1,59 @@ +{ + "mode": "preview", + "workers": { + "manager_ip": "129.59.104.153", + "manager_port": 9001, + "instances": [ + { + "name": "local", + "id": "local", + "description": "local worker for the MMLA pipeline demo with a video node" + } + ] + }, + "nodes": [ + { + "registry_name": "CPPipelines_Video", + "name": "test-1", + "kwargs": { + "video_src": 2, + "width": 1920, + "height": 600, + "frame_key": "frame", + "debug": true, + "include_meta": true, + "frame_rate": 30, + "save_name": "video-test-1" + }, + "package": "chimerapy-pipelines" + }, + { + "registry_name": "CPPipelines_ShowWindows", + "name": "show", + "kwargs": { + "window_xy": [ + 300, + 300 + ], + "items_per_row": 2 + }, + "package": "chimerapy-pipelines" + } + ], + "adj": [ + [ + "test-1", + "show" + ] + ], + "manager_config": { + "logdir": "cp-logs", + "port": 9001 + }, + "mappings": { + "local": [ + "test-1", + "show" + ] + } +} diff --git a/configs/speech_to_text/stt_deepgram.json b/configs/speech_to_text/stt_deepgram.json new file mode 100644 index 0000000..037ccc5 --- /dev/null +++ b/configs/speech_to_text/stt_deepgram.json @@ -0,0 +1,62 @@ +{ + "mode": "record", + "workers": { + "manager_ip": "129.59.104.153", + "manager_port": 9001, + "instances": [ + { + "name": "local", + "id": "local", + "description": "local worker for the MMLA pipeline for Speech to Text with deepgram" + } + ] + }, + "nodes": [ + { + "registry_name": "CPPipelines_AudioNode", + "name": "local-audio", + "kwargs": { + "backend": "pvrecorder", + "input_device_id": 3, + "audio_format": "INT16", + "sample_rate": "RATE_44100", + "chunk_size": "CHUNK_512", + "save_name": "local-audio", + "chunk_key": "audio_chunk" + }, + "package": "chimerapy-pipelines" + }, + { + "registry_name": "CPPipelines_DeepgramNode", + "name": "stt-deepgram", + "kwargs": { + "name": "stt-deepgram", + "api_key": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + "chunk_key": "audio_chunk", + "deepgram_options": { + "encoding": "linear16", + "channels": 1, + "sample_rate": 16000, + "language": "en-US" + } + }, + "package": "chimerapy-pipelines" + } + ], + "adj": [ + [ + "local-audio", + "stt-deepgram" + ] + ], + "manager_config": { + "logdir": "cp-logs", + "port": 9001 + }, + "mappings": { + "local": [ + "local-audio", + "stt-deepgram" + ] + } +} diff --git a/pyproject.toml b/pyproject.toml index c056fde..adb2d28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,8 @@ dependencies = [ 'chimerapy-orchestrator', 'chimerapy-engine', 'pyaudio', - 'pvrecorder' + 'pvrecorder', + 'deepgram-sdk' ] [project.optional-dependencies] From 92444aff55dddf14d681d358f24659089f5bc13d Mon Sep 17 00:00:00 2001 From: Umesh Timalsina Date: Mon, 11 Sep 2023 13:42:08 -0500 Subject: [PATCH 3/5] Async setup and teardown(WIP) --- chimerapy/pipelines/speech_to_text/deepgram_node.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/chimerapy/pipelines/speech_to_text/deepgram_node.py b/chimerapy/pipelines/speech_to_text/deepgram_node.py index 037a7ab..d5e4ffd 100644 --- a/chimerapy/pipelines/speech_to_text/deepgram_node.py +++ b/chimerapy/pipelines/speech_to_text/deepgram_node.py @@ -1,4 +1,3 @@ -import asyncio import json from typing import Any, Dict, Optional @@ -38,7 +37,7 @@ def __init__( self.chunk_key = chunk_key self.deepgram_options = deepgram_options or {} - def setup(self) -> None: + async def setup(self) -> None: """Setup the Deepgram client.""" self.deepgram_client = Deepgram(self.api_key) @@ -93,7 +92,7 @@ def _save_transcript(self, name, response) -> None: } self.save_tabular(name, transcript_data) - def teardown(self) -> None: + async def teardown(self) -> None: """Finish all transcriptions.""" for transcriber in self.transcribers.values(): - asyncio.create_task(transcriber.finish()) + await transcriber.finish() From 0cd1a0d849743788ed96d45c13b23639fbf4c356 Mon Sep 17 00:00:00 2001 From: Umesh Timalsina Date: Mon, 11 Sep 2023 14:47:14 -0500 Subject: [PATCH 4/5] Add save_json function for saving direct responses --- chimerapy/pipelines/speech_to_text/deepgram_node.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/chimerapy/pipelines/speech_to_text/deepgram_node.py b/chimerapy/pipelines/speech_to_text/deepgram_node.py index d5e4ffd..386a8e1 100644 --- a/chimerapy/pipelines/speech_to_text/deepgram_node.py +++ b/chimerapy/pipelines/speech_to_text/deepgram_node.py @@ -1,4 +1,3 @@ -import json from typing import Any, Dict, Optional from deepgram import Deepgram @@ -88,9 +87,9 @@ def _save_transcript(self, name, response) -> None: "conf": response["channel"]["alternatives"][0]["confidence"], "start": response["start"], "end": response["start"] + response["duration"], - "deepgram_json": json.dumps(response, indent=0), } self.save_tabular(name, transcript_data) + self.save_json(f"{name}-deepgram-responses", response) async def teardown(self) -> None: """Finish all transcriptions.""" From e5194d941beb637c9d92d794c2782f745bc74801 Mon Sep 17 00:00:00 2001 From: Umesh Timalsina Date: Mon, 11 Sep 2023 14:57:36 -0500 Subject: [PATCH 5/5] Remove unneeded config file --- configs/displays/camera_test.json | 59 ------------------------------- 1 file changed, 59 deletions(-) delete mode 100644 configs/displays/camera_test.json diff --git a/configs/displays/camera_test.json b/configs/displays/camera_test.json deleted file mode 100644 index e0bae9a..0000000 --- a/configs/displays/camera_test.json +++ /dev/null @@ -1,59 +0,0 @@ -{ - "mode": "preview", - "workers": { - "manager_ip": "129.59.104.153", - "manager_port": 9001, - "instances": [ - { - "name": "local", - "id": "local", - "description": "local worker for the MMLA pipeline demo with a video node" - } - ] - }, - "nodes": [ - { - "registry_name": "CPPipelines_Video", - "name": "test-1", - "kwargs": { - "video_src": 2, - "width": 1920, - "height": 600, - "frame_key": "frame", - "debug": true, - "include_meta": true, - "frame_rate": 30, - "save_name": "video-test-1" - }, - "package": "chimerapy-pipelines" - }, - { - "registry_name": "CPPipelines_ShowWindows", - "name": "show", - "kwargs": { - "window_xy": [ - 300, - 300 - ], - "items_per_row": 2 - }, - "package": "chimerapy-pipelines" - } - ], - "adj": [ - [ - "test-1", - "show" - ] - ], - "manager_config": { - "logdir": "cp-logs", - "port": 9001 - }, - "mappings": { - "local": [ - "test-1", - "show" - ] - } -}