diff --git a/.github/workflows/code-check.yml b/.github/workflows/code-check.yml
new file mode 100644
index 0000000..59eb799
--- /dev/null
+++ b/.github/workflows/code-check.yml
@@ -0,0 +1,37 @@
+name: Code Quality Checks
+on:
+ pull_request:
+ branches:
+ - main
+ push:
+ branches:
+ - main
+
+jobs:
+ code-quality:
+ name: Code Quality Checks
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout Code
+ uses: actions/checkout@v4
+ with:
+ fetch-depth: 0
+
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.12.9"
+
+ - name: Install Dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install -r dev-requirements.txt
+
+ - name: Run Ruff
+ run: |
+ ruff check .
+
+ - name: Run isort
+ run: |
+ isort . --check-only
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..ab61f2f
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,168 @@
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+cover/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+.pybuilder/
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+# For a library or package, you might want to ignore these files since the code is
+# intended to run in multiple environments; otherwise, check them in:
+# .python-version
+
+# pipenv
+# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+# However, in case of collaboration, if having platform-specific dependencies or dependencies
+# having no cross-platform support, pipenv may install dependencies that don't work, or not
+# install all needed dependencies.
+#Pipfile.lock
+
+# poetry
+# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
+# This is especially recommended for binary packages to ensure reproducibility, and is more
+# commonly ignored for libraries.
+# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
+#poetry.lock
+
+# pdm
+# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
+#pdm.lock
+# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
+# in version control.
+# https://pdm.fming.dev/#use-with-ide
+.pdm.toml
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# pytype static type analyzer
+.pytype/
+
+# Cython debug symbols
+cython_debug/
+
+# PyCharm
+# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
+# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
+# and can be added to the global gitignore or merged into this file. For a more nuclear
+# option (not recommended) you can uncomment the following to ignore the entire idea folder.
+#.idea/
+
+.vscode
+
+# Local History for Visual Studio Code
+.history/
+
+# Built Visual Studio Code Extensions
+*.vsix
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..eaa2696
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,14 @@
+repos:
+ - repo: https://github.com/astral-sh/ruff-pre-commit
+ rev: v0.11.4
+ hooks:
+ - id: ruff
+ args: [--fix]
+ - id: ruff-format
+ #args: [--force-exclude=false]
+
+ - repo: https://github.com/pycqa/isort
+ rev: 6.0.1
+ hooks:
+ - id: isort
+ args: ["--profile", "black"]
diff --git a/README.md b/README.md
index 123366e..be4c916 100644
--- a/README.md
+++ b/README.md
@@ -1,150 +1,625 @@
-# Djelia Python Client
-
-Python client for the Djelia API, providing language services for Bambara .
+#
Djelia Python SDK Workshop
+Hey there! Welcome to this fun and practical workshop on using the Djelia Python SDK. Whether you're translating between African languages, transcribing audio with realtime streaming, or generating natural sounding speech, this guide has got you covered. We'll walk through installing the SDK, setting up clients, and performing some cool operations like multi language translation, audio transcription, and text-to-speech generation. I've added a sprinkle of humor to keep things light because who said coding can't be fun, right? Let's dive in!
+
+## Table of Contents
+
+1. [Installation](#installation)
+2. [Client Initialization](#client-initialization)
+ - 2.1 [API Key Loading](#api-key-loading)
+ - 2.2 [Synchronous Client](#synchronous-client)
+ - 2.3 [Asynchronous Client](#asynchronous-client)
+3. [Operations](#operations)
+ - 3.1 [Translation](#translation)
+ - 3.1.1 [Get Supported Languages](#get-supported-languages)
+ - 3.1.2 [Translate Text](#translate-text)
+ - 3.2 [Transcription](#transcription)
+ - 3.2.1 [Basic Transcription](#basic-transcription)
+ - 3.2.2 [Streaming Transcription](#streaming-transcription)
+ - 3.2.3 [French Translation](#french-translation)
+ - 3.3 [Text-to-Speech (TTS)](#text-to-speech-tts)
+ - 3.3.1 [TTS v1 with Speaker ID](#tts-v1-with-speaker-id)
+ - 3.3.2 [TTS v2 with Natural Descriptions](#tts-v2-with-natural-descriptions)
+ - 3.3.3 [Streaming TTS](#streaming-tts)
+ - 3.4 [Version Management](#version-management)
+ - 3.5 [Parallel Operations](#parallel-operations)
+4. [Error Handling](#error-handling)
+5. [Explore the Djelia SDK Cookbook](#explore-the-djelia-sdk-cookbook)
+
+## Installation
+
+Let's kick things off by installing the Djelia Python SDK with one of those magical commands. Run it in your terminal, and you're good to go!
+
+
+Install the Djelia Python SDK directly from GitHub:
+```bash
+ pip install git+https://github.com/djelia/djelia-python-sdk.git
+```
-## Installation
+Alternatively, use uv for faster dependency resolution:
```bash
-pip install git+https://github.com/djelia-org/djelia-python-client
+ uv pip install git+https://github.com/djelia/djelia-python-sdk.git
```
-## Quick Start
+Note: A PyPI package (pip install djelia) is coming soon stay tuned!
+
+
+## Client Initialization
+
+Before we can do anything fancy, we need to set up our clients. This involves loading our API key and initializing both synchronous and asynchronous clients. Here's how:
+
+## API Key Loading
+
+First, grab your API key from a `.env` file it's the safest way to keep your secrets, well, secret! If you don't have one yet, head to the Djelia dashboard and conjure one up.
+
+```python
+from dotenv import load_dotenv
+import os
+
+load_dotenv()
+api_key = os.environ.get("DJELIA_API_KEY")
+
+# Alternatively: api_key = "your_api_key_here" (but shh, that's not safe!)
+
+# Specify your audio file for transcription tests
+audio_file_path = os.environ.get("TEST_AUDIO_FILE", "audio.wav")
+```
+
+> **Note:** Ensure your audio file (e.g., `audio.wav`) exists at the specified path. Set `TEST_AUDIO_FILE` in your `.env` file if using a custom path:
+> ```bash
+> echo "TEST_AUDIO_FILE=/path/to/your/audio.wav" >> .env
+> ```
+> Without a valid audio file, transcription operations will fail. That not what you want right 😂
+
+
+Synchronous Client
+
+
-### Authentication
+For those who like to take things one step at a time, here's how to set up the synchronous client:
```python
from djelia import Djelia
-# Using API key directly
-client = Djelia(api_key="your_api_key")
+djelia_client = Djelia(api_key=api_key)
+
+# if DJELIA_API_KEY is already set you can just do : (yes I know I'm making your life easy 😂)
+djelia_client = Djelia()
+
+```
+
+## Asynchronous Client
+
+If you're ready to live on the async edge, initialize the asynchronous client like this:
+
+```python
+from djelia import DjeliaAsync
+
+djelia_async_client = DjeliaAsync(api_key=api_key)
+
+# if DJELIA_API_KEY is already set you can just do : (again easy life 😂)
+
+djelia_async_client = DjeliaAsync()
+```
+
+## Operations 🇲🇱
+
+ Now for the fun part let's do stuff with the Djelia API! We'll cover translating between African languages, transcribing audio (with streaming!), and generating natural speech, with examples for both synchronous and asynchronous approaches. Yes, yes, let's do it ❤️🔥!
+
+
+## Translation
+
+Let's unlock the power of multilingual communication!
+## Get Supported Languages
+
+First, let's see what languages we can work with.
+
+## Synchronous
+
+Simple and straightforward get your supported languages and print them:
-# Or use environment variable
-# export DJELIA_API_KEY=your_api_key
-client = Djelia()
+```python
+supported_languages = djelia_client.translation.get_supported_languages()
+for lang in supported_languages:
+ print(f"{lang.name}: {lang.code}")
+```
+
+## Asynchronous
+
+For the async fans, here's how to fetch supported languages. Don't forget to run it with asyncio:
+
+```python
+import asyncio
+
+async def get_languages_test():
+ async with djelia_async_client as client:
+ supported_languages = await client.translation.get_supported_languages()
+ for lang in supported_languages:
+ print(f"{lang.name}: {lang.code}")
+
+asyncio.run(get_languages_test())
```
-### Translation
+## Translate Text
-Translate text between supported languages:
+Let's translate some text between beautiful 🇲🇱 languages and others. Feel free to try different language combinations!
```python
-result = client.translate(
- text="Hello, how are you?",
- source="en", # English
- target="bam" # Bambara
+from djelia.models import TranslationRequest, Language, Versions
+
+request = TranslationRequest(
+ text="Hello, how are you today?",
+ source=Language.ENGLISH,
+ target=Language.BAMBARA
)
-print(result["text"])
```
-### Transcription
+## Synchronous
+
+Create that translation and see what you get:
+
+```python
+from djelia.models import TranslationResponse
+
+try:
+ response_sync: TranslationResponse = djelia_client.translation.translate(request=request, version=Versions.v1)
+ print(f"Original: {request.text}")
+ print(f"Translation: {response_sync.text}")
+except Exception as e:
+ print(f"Translation error: {e}")
+```
+
+## Asynchronous
+
+Async translation because why wait around? Let's do it bro !
+
+```python
+async def translate_async():
+ async with djelia_async_client as client:
+ try:
+ response_async: TranslationResponse = await client.translation.translate(request=request, version=Versions.v1)
+ print(f"Original: {request.text}")
+ print(f"Translation: {response_async.text}")
+ return response_async
+ except Exception as e:
+ print(f"Translation error: {e}")
+
+asyncio.run(translate_async())
+```
+
+## Transcription
+
+Time to turn audio into text with timestamps and everything!
+
+## Basic Transcription
+
+Let's transcribe some audio files. Make sure you have an audio file ready check audio_file_path.
+
+## Synchronous
+
+```python
+from djelia.models import Versions
+
+try:
+ transcription = djelia_client.transcription.transcribe(
+ audio_file=audio_file_path,
+ version=Versions.v2
+ )
+ print(f"Transcribed {len(transcription)} segments:")
+ for segment in transcription:
+ print(f"[{segment.start:.2f}s - {segment.end:.2f}s]: {segment.text}")
+except Exception as e:
+ print(f"Transcription error: {e}")
+```
+
+## Asynchronous
+
+For the async enthusiasts: (like me, I ❤️ it)
+
+```python
+async def transcribe_async():
+ async with djelia_async_client as client:
+ try:
+ transcription = await client.transcription.transcribe(
+ audio_file=audio_file_path,
+ version=Versions.v2
+ )
+ print(f"Transcribed {len(transcription)} segments:")
+ for segment in transcription:
+ print(f"[{segment.start:.2f}s - {segment.end:.2f}s]: {segment.text}")
+ except Exception as e:
+ print(f"Transcription error: {e}")
+
+asyncio.run(transcribe_async())
+```
+
+## Streaming Transcription
+
+Want realtime results? Let's stream that transcription! This is really important of live applications
+
+## Synchronous
+
+```python
+print("Streaming transcription (showing first 3 segments)...")
+segment_count = 0
+
+try:
+ for segment in djelia_client.transcription.transcribe(
+ audio_file=audio_file_path,
+ stream=True,
+ version=Versions.v2
+ ):
+ segment_count += 1
+ print(f"Segment {segment_count}: [{segment.start:.2f}s]: {segment.text}")
+
+ if segment_count >= 3: # Just showing first 3 for demo
+ print("...and more segments!")
+ break
+except Exception as e:
+ print(f"Streaming transcription error: {e}")
+```
+
+## Asynchronous
+
+Async streaming because realtime is awesome: (bro, I'm telling you, one second is a lot)
+
+```python
+async def stream_transcribe_async():
+ async with djelia_async_client as client:
+ try:
+ stream = await client.transcription.transcribe(
+ audio_file=audio_file_path,
+ stream=True,
+ version=Versions.v2
+ )
+ segment_count = 0
+ async for segment in stream:
+ segment_count += 1
+ print(f"Segment {segment_count}: [{segment.start:.2f}s]: {segment.text}")
+
+ if segment_count >= 3: # Just showing first 3 for demo
+ print("...and more segments!")
+ break
+ except Exception as e:
+ print(f"Streaming transcription error: {e}")
+
+asyncio.run(stream_transcribe_async())
+```
+
+## French Translation
+
+Want to transcribe and translate to French in one go? We've got you covered!
+
+## Synchronous
+
+```python
+try:
+ french_transcription = djelia_client.transcription.transcribe(
+ audio_file=audio_file_path,
+ translate_to_french=True,
+ version=Versions.v2
+ )
+ print(f"French translation: {french_transcription.text}")
+except Exception as e:
+ print(f"French transcription error: {e}")
+```
+
+## Asynchronous
+
+```python
+async def transcribe_french_async():
+ async with djelia_async_client as client:
+ try:
+ french_transcription = await client.transcription.transcribe(
+ audio_file=audio_file_path,
+ translate_to_french=True,
+ version=Versions.v2
+ )
+ print(f"French translation: {french_transcription.text}")
+ except Exception as e:
+ print(f"French transcription error: {e}")
+
+asyncio.run(transcribe_french_async())
+```
+
+## Text-to-Speech (TTS)
-Convert speech to text from audio files:
+Let's make some beautiful voices! Choose between numbered speakers or describe exactly how you want it to sound.
+
+## TTS v1 with Speaker ID
+
+Classic approach with speaker IDs (0-4). Simple and effective!
```python
-# Basic transcription
-result = client.transcribe("audio_file.mp3")
+from djelia.models import TTSRequest
+
+tts_request_v1 = TTSRequest(
+ text="Aw ni ce, i ka kɛnɛ wa?", # "Hello, how are you?" in Bambara
+ speaker=1 # Choose from 0, 1, 2, 3, or 4
+)
+```
-# With timestamps (version 2)
-result = client.transcribe("audio_file.mp3", version=2)
-for segment in result:
- print(f"{segment['text']}")
+## Synchronous
-# With French translation
-result = client.transcribe("audio_file.mp3", translate_to_french=True)
+Generate that audio and save it:
+
+```python
+try:
+ audio_file_v1 = djelia_client.tts.text_to_speech(
+ request=tts_request_v1,
+ output_file="hello_v1.wav",
+ version=Versions.v1
+ )
+ print(f"Audio saved to: {audio_file_v1}")
+except Exception as e:
+ print(f"TTS v1 error: {e}")
```
-### Streaming Transcription
+## Asynchronous
-Process audio in real-time:
+Async audio generation:
```python
-for segment in client.stream_transcribe("audio_file.mp3"):
- print(segment["text"])
+async def generate_audio_v1_async():
+ async with djelia_async_client as client:
+ try:
+ audio_file_v1 = await client.tts.text_to_speech(
+ request=tts_request_v1,
+ output_file="hello_v1_async.wav",
+ version=Versions.v1
+ )
+ print(f"Audio saved to: {audio_file_v1}")
+ except Exception as e:
+ print(f"TTS v1 error: {e}")
+
+asyncio.run(generate_audio_v1_async())
```
-### Text-to-Speech
+## TTS v2 with Natural Descriptions
-Convert text to natural-sounding speech:
+This is where it gets fun! Describe exactly how you want the voice to sound, but make sure to include one of the supported speakers: Moussa, Sekou, or Seydou.
```python
-# Generate audio and save to file
-client.text_to_speech(
- "Text to convert to speech",
- speaker=1, # Choose voice (0-4)
- output_file="output.mp3"
+from djelia.models import TTSRequestV2
+
+tts_request_v2 = TTSRequestV2(
+ text="Aw ni ce, i ka kɛnɛ wa?",
+ description="Seydou speaks with a warm, welcoming tone", # Must include Moussa, Sekou, or Seydou
+ chunk_size=1.0 # Control speech pacing (0.1 - 2.0)
)
+```
+
+> **Note:** The description field must include one of the supported speakers. For example, "Moussa speaks with a warm tone" is valid, but "Natural tone" will raise an error.
+
+## Synchronous
+
+Create natural sounding speech:
+
+```python
+try:
+ audio_file_v2 = djelia_client.tts.text_to_speech(
+ request=tts_request_v2,
+ output_file="hello_v2.wav",
+ version=Versions.v2
+ )
+ print(f"Natural audio saved to: {audio_file_v2}")
+except Exception as e:
+ print(f"TTS v2 error: {e}")
+```
+
+## Asynchronous
+
+Async natural speech generation:
-# Get audio as bytes
-audio_bytes = client.text_to_speech("Hello world")
+```python
+async def generate_natural_audio_async():
+ async with djelia_async_client as client:
+ try:
+ audio_file_v2 = await client.tts.text_to_speech(
+ request=tts_request_v2,
+ output_file="hello_v2_async.wav",
+ version=Versions.v2
+ )
+ print(f"Natural audio saved to: {audio_file_v2}")
+ except Exception as e:
+ print(f"TTS v2 error: {e}")
+
+asyncio.run(generate_natural_audio_async())
```
-## Async Support
+## Streaming TTS
-For high-performance applications, use the async client:
+Realtime audio generation! Get chunks as they're created (v2 only).
```python
-from djelia import AsyncDjelia
-import asyncio
+streaming_tts_request = TTSRequestV2(
+ text="An filɛ ni ye yɔrɔ minna ni an ye an sigi ka a layɛ yala an bɛ ka baara min kɛ ɛsike a kɛlen don ka Ɲɛ wa, ...............", # a very long text
+ description="Seydou speaks clearly and naturally",
+ chunk_size=1.0
+)
+```
+
+> **Note:** By default, the SDK may process multiple chunks (e.g., up to 5 in some configurations). This example limits to 5 chunks for consistency, but you can adjust the limit based on your application needs.
-async def main():
- async with AsyncDjelia(api_key="your_api_key") as client:
- # All methods are the same, just use "await"
- result = await client.translate("Hello", "en", "fr")
- print(result["text"])
+## Synchronous
+
+Stream that audio generation: (this is handsome)
+
+```python
+print("Streaming TTS generation...")
+chunk_count = 0
+total_bytes = 0
+max_chunks = 5
+
+try:
+ for chunk in djelia_client.tts.text_to_speech(
+ request=streaming_tts_request,
+ output_file="streamed_audio.wav",
+ stream=True,
+ version=Versions.v2
+ ):
+ chunk_count += 1
+ total_bytes += len(chunk)
+ print(f"Chunk {chunk_count}: {len(chunk)} bytes")
- # Streaming has a slightly different syntax
- async for segment in client.stream_transcribe("audio.mp3"):
- print(segment["text"])
+ if chunk_count >= max_chunks:
+ print(f"...and more chunks! (Total so far: {total_bytes} bytes)")
+ break
+except Exception as e:
+ print(f"Streaming TTS error: {e}")
+```
+
+## Asynchronous
-asyncio.run(main())
+Async streaming TTS because realtime is the future (oops, actually it's today 😂):
+
+```python
+async def stream_tts_async():
+ async with djelia_async_client as client:
+ try:
+ stream = await client.tts.text_to_speech(
+ request=streaming_tts_request,
+ output_file="streamed_audio_async.wav",
+ stream=True,
+ version=Versions.v2
+ )
+ chunk_count = 0
+ total_bytes = 0
+ max_chunks = 5
+
+ async for chunk in stream:
+ chunk_count += 1
+ total_bytes += len(chunk)
+ print(f"Chunk {chunk_count}: {len(chunk)} bytes")
+
+ if chunk_count >= max_chunks:
+ print(f"...and more chunks! (Total so far: {total_bytes} bytes)")
+ break
+ except Exception as e:
+ print(f"Streaming TTS error: {e}")
+
+asyncio.run(stream_tts_async())
```
-### Parallel Processing
+## Version Management
-Run multiple operations simultaneously for better performance:
+The SDK supports multiple API versions (v1, v2) via the Versions enum. Use `Versions.latest()` to get the latest version or `Versions.all_versions()` to list available versions.
```python
-async def main():
- async with AsyncDjelia(api_key="your_api_key") as client:
- # Run tasks in parallel
- results = await asyncio.gather(
- client.translate("Hello", "en", "bam"),
- client.get_supported_languages(),
- client.text_to_speech("Aw ni ce", output_file="greeting.wav")
- )
-
- # Access results
- translation, languages, audio_path = results
+from djelia.models import Versions
+
+print(f"Latest version: {Versions.latest()}")
+print(f"Available versions: {[str(v) for v in Versions.all_versions()]}")
-asyncio.run(main())
+# Use specific version
+try:
+ transcription = djelia_client.transcription.transcribe(
+ audio_file=audio_file_path,
+ version=Versions.v2
+ )
+ print(f"Transcribed {len(transcription)} segments")
+except Exception as e:
+ print(f"Transcription error: {e}")
```
-## Supported Languages
+## Parallel Operations
-Currently supports:
-- English (en)
-- French (fr)
-- Bambara (bam)
+Run multiple API operations concurrently using `asyncio.gather` with the async client. This is great for performance in applications needing simultaneous translations, transcriptions, or TTS generation.
-Check available languages:
```python
-languages = client.get_supported_languages()
+import asyncio
+from djelia.models import TranslationRequest, Language, TTSRequestV2, Versions
+
+async def parallel_operations():
+ async with DjeliaAsync(api_key=api_key) as client:
+ try:
+ translation_request = TranslationRequest(
+ text="Hello", source=Language.ENGLISH, target=Language.BAMBARA
+ )
+ tts_request = TTSRequestV2(
+ text="Aw ni ce, i ka kɛnɛ wa?",
+ description="Moussa speaks with a clear tone",
+ chunk_size=1.0
+ )
+
+ results = await asyncio.gather(
+ client.translation.translate(translation_request, version=Versions.v1),
+ client.transcription.transcribe(audio_file_path, version=Versions.v2),
+ client.tts.text_to_speech(tts_request, output_file="parallel_tts.wav", version=Versions.v2),
+ return_exceptions=True
+ )
+
+ for i, result in enumerate(results):
+ if isinstance(result, Exception):
+ print(f"Operation {i+1} failed: {result}")
+ else:
+ print(f"Operation {i+1} succeeded: {type(result).__name__}")
+ except Exception as e:
+ print(f"Parallel operations error: {e}")
+
+asyncio.run(parallel_operations())
```
-## Error Handling
+## Error Handling
-Handle API errors gracefully:
+The Djelia SDK provides specific exception classes to handle errors gracefully. Use these to catch and respond to issues like invalid API keys, unsupported languages, or incorrect speaker descriptions.
```python
-from djelia import Djelia, DjeliaError
+from djelia.utils.exceptions import AuthenticationError, APIError, ValidationError, LanguageError, SpeakerError
try:
- client = Djelia(api_key="your_api_key")
- result = client.translate("Hello", "en", "bam")
-except DjeliaError as e:
- print(f"API error: {e}")
+ response = djelia_client.translation.translate(request=request, version=Versions.v1)
+ print(f"Translation: {response.text}")
+except AuthenticationError as e:
+ print(f"Authentication error (check API key): {e}")
+except LanguageError as e:
+ print(f"Invalid or unsupported language: {e}")
+except ValidationError as e:
+ print(f"Validation error (e.g., invalid input): {e}")
+except APIError as e:
+ print(f"API error (status {e.status_code}): {e.message}")
+except Exception as e:
+ print(f"Unexpected error: {e}")
```
-## Support
+## Common Exceptions:
+
+- **AuthenticationError**: Invalid or expired API key (HTTP 401).
+- **APIError**: General API issues, including forbidden access (403) or resource not found (404).
+- **ValidationError**: Invalid inputs, such as missing audio files or incorrect parameters (422).
+- **LanguageError**: Unsupported source or target language.
+- **SpeakerError**: Invalid speaker ID (TTS v1) or description missing a supported speaker (TTS v2).
+
+Check logs for detailed errors, and ensure your `.env` file includes a valid `DJELIA_API_KEY` and `TEST_AUDIO_FILE`.
+
+## Explore the Djelia SDK Cookbook
+
+Want to take your Djelia SDK skills to the next level? Check out the **Djelia SDK Cookbook** for a comprehensive example that puts it all together! The cookbook demonstrates:
+
+- **Full Test Suite**: Run synchronous and asynchronous tests for translation, transcription, and TTS, with detailed summaries.
+- **Error Handling**: Robust try-except blocks and logging to catch and debug issues.
+- **Configuration Management**: Load API keys and audio paths from a `.env` file with validation.
+- **Advanced Features**: Parallel API operations, version management, and streaming capabilities.
+- **Modular Design**: Organized code structure for easy customization.
+
+To run the cookbook, clone the repository, install dependencies, and execute:
+
+```bash
+git clone https://github.com/djelia/djelia-python-sdk.git
+pip install git+https://github.com/djelia/djelia-python-sdk.git python-dotenv
+
+cd djelia-python-sdk
+python -m cookbook.main
+```
+
+Make sure your `.env` file includes `DJELIA_API_KEY` and `TEST_AUDIO_FILE`. The cookbook is perfect for developers who want a ready-to-use template for building real-world applications with the Djelia SDK.
+
+## Wrapping Up
+
+And there you have it a full workshop on using the Djelia Python SDK! You've installed it, set up clients, and mastered translation, transcription, and text-to-speech both synchronously and asynchronously. Pretty cool, right? Feel free to tweak the code, explore different languages and voices, and check out the Djelia SDK Cookbook for a deeper dive.
+
+**Pro tip**: The async methods are perfect for applications that need to handle multiple operations simultaneously. The streaming features are fantastic for realtime applications. And remember, Bambara is just one of the beautiful African languages you can work with!
+
+IMPORTANT: If you encounter any issues, please create an issue in the repository, explain the problem you encountered (include logs if possible), and tag @sudoping01.
-Need help? Contact: support@djelia.cloud
+**Great job, bro 🫂! This is a fantastic integration guide built with ❤️ for 🇲🇱 and beyond!**
\ No newline at end of file
diff --git a/cookbook/__init__.py b/cookbook/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/cookbook/config.py b/cookbook/config.py
new file mode 100644
index 0000000..5a03a0e
--- /dev/null
+++ b/cookbook/config.py
@@ -0,0 +1,21 @@
+import os
+from dataclasses import dataclass
+from typing import Optional
+
+from dotenv import load_dotenv
+
+
+@dataclass
+class Config:
+ api_key: Optional[str] = None
+ audio_file_path: str = "audio.wav"
+ max_stream_segments: int = 3
+ max_stream_chunks: int = 5
+
+ @classmethod
+ def load(cls) -> "Config":
+ load_dotenv()
+ return cls(
+ api_key=os.environ.get("DJELIA_API_KEY"),
+ audio_file_path=os.environ.get("TEST_AUDIO_FILE", "audio.wav"),
+ )
diff --git a/cookbook/cookbook.py b/cookbook/cookbook.py
new file mode 100644
index 0000000..53afcd2
--- /dev/null
+++ b/cookbook/cookbook.py
@@ -0,0 +1,702 @@
+import asyncio
+import logging
+import os
+import traceback
+from typing import List
+from uuid import uuid4
+
+from djelia import Djelia, DjeliaAsync
+from djelia.models import (Language, SupportedLanguageSchema,
+ TranslationRequest, TranslationResponse, TTSRequest,
+ TTSRequestV2, Versions)
+
+from .config import Config
+from .utils import (ConsoleColor, handle_transcription_result, print_error,
+ print_info, print_success, print_summary, process_result)
+
+# ================================================
+# Djelia Cookbook
+# ================================================
+
+
+class DjeliaCookbook:
+ def __init__(self, config: Config):
+ self.config = config
+ self.sync_client = Djelia(api_key=config.api_key)
+ self.async_client = DjeliaAsync(api_key=config.api_key)
+ self.test_results = {}
+
+ self.translation_samples = [
+ ("Hello, how are you?", Language.ENGLISH, Language.BAMBARA),
+ ("Bonjour, comment allez-vous?", Language.FRENCH, Language.BAMBARA),
+ ("Good morning", Language.ENGLISH, Language.FRENCH),
+ ]
+ self.bambara_tts_text = "Aw ni ce, i ka kɛnɛ wa?"
+ self.supported_speakers = ["Moussa", "Sekou", "Seydou"]
+
+ # ------------------------------
+ # Setup Validation
+ # ------------------------------
+
+ def validate_setup(self) -> bool:
+ valid = True
+
+ if not self.config.api_key:
+ print_error("DJELIA_API_KEY not found")
+ valid = False
+
+ if not os.path.exists(self.config.audio_file_path):
+ print_error(f"Audio file not found: {self.config.audio_file_path}")
+ valid = False
+
+ if valid:
+ print_success("API Key and audio file loaded")
+
+ return valid
+
+ # ------------------------------
+ # Translation Tests
+ # ------------------------------
+
+ def test_translation_sync(self) -> None:
+ test_name = "Sync Translation"
+ print(
+ f"{ConsoleColor.CYAN}\n{'SYNCHRONOUS TRANSLATION':^60}{ConsoleColor.RESET}"
+ )
+
+ try:
+ languages: List[SupportedLanguageSchema] = (
+ self.sync_client.translation.get_supported_languages()
+ )
+ print_success(f"Supported languages: {len(languages)}")
+
+ for text, source, target in self.translation_samples:
+ request = TranslationRequest(text=text, source=source, target=target)
+ response: TranslationResponse = self.sync_client.translation.translate(
+ request=request, version=Versions.v1
+ )
+ print_success(
+ f"{source.value} → {target.value}: "
+ f"'{text}' → '{ConsoleColor.YELLOW}{response.text}{ConsoleColor.RESET}'"
+ )
+
+ self.test_results[test_name] = (
+ "Success",
+ f"{len(self.translation_samples)} translations",
+ )
+
+ except Exception as e:
+ print_error(f"Translation error: {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ async def test_translation_async(self) -> None:
+ test_name = "Async Translation"
+ print(
+ f"{ConsoleColor.PURPLE}\n{'ASYNCHRONOUS TRANSLATION':^60}{ConsoleColor.RESET}"
+ )
+
+ async with self.async_client as client:
+ try:
+ languages = await client.translation.get_supported_languages()
+ print_success(f"Supported languages (async): {len(languages)}")
+
+ for text, source, target in self.translation_samples:
+ request = TranslationRequest(
+ text=text, source=source, target=target
+ )
+ response = await client.translation.translate(
+ request=request, version=Versions.v1
+ )
+ print_success(
+ f"{source.value} → {target.value} (async): "
+ f"'{text}' → '{ConsoleColor.YELLOW}{response.text}{ConsoleColor.RESET}'"
+ )
+
+ self.test_results[test_name] = (
+ "Success",
+ f"{len(self.translation_samples)} translations",
+ )
+
+ except Exception as e:
+ print_error(f"Async translation error: {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ # ------------------------------
+ # Transcription Tests
+ # ------------------------------
+
+ def test_transcription_sync(self) -> None:
+ test_name = "Sync Transcription"
+ print(
+ f"{ConsoleColor.CYAN}\n{'SYNCHRONOUS TRANSCRIPTION':^60}{ConsoleColor.RESET}"
+ )
+
+ if not os.path.exists(self.config.audio_file_path):
+ print_error("Audio file missing")
+ self.test_results[test_name] = ("Failed", "Missing audio file")
+ return
+
+ for version in [Versions.v1, Versions.v2]:
+ try:
+ print_info(f"Testing non-streaming v{version.value}")
+ transcription = self.sync_client.transcription.transcribe(
+ self.config.audio_file_path, version=version
+ )
+ handle_transcription_result(transcription, f"v{version.value}")
+
+ if version == Versions.v2:
+ print_info("Testing French translation")
+ transcription_fr = self.sync_client.transcription.transcribe(
+ self.config.audio_file_path,
+ translate_to_french=True,
+ version=version,
+ )
+ print_success(
+ f"French translation (v{version.value}): "
+ f"'{ConsoleColor.YELLOW}{transcription_fr.text}{ConsoleColor.RESET}'"
+ )
+
+ self.test_results[test_name] = (
+ "Success",
+ f"v{version.value} completed",
+ )
+
+ except Exception as e:
+ print_error(f"Transcription error (v{version.value}): {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ async def test_transcription_async(self) -> None:
+ test_name = "Async Transcription"
+ print(
+ f"{ConsoleColor.PURPLE}\n{'ASYNCHRONOUS TRANSCRIPTION':^60}{ConsoleColor.RESET}"
+ )
+
+ if not os.path.exists(self.config.audio_file_path):
+ print_error("Audio file missing")
+ self.test_results[test_name] = ("Failed", "Missing audio file")
+ return
+
+ async with self.async_client as client:
+ for version in [Versions.v1, Versions.v2]:
+ try:
+ print_info(f"Testing non-streaming v{version.value}")
+ transcription = await client.transcription.transcribe(
+ self.config.audio_file_path, version=version
+ )
+ handle_transcription_result(
+ transcription, f"v{version.value} (async)"
+ )
+
+ if version == Versions.v2:
+ print_info("Testing French translation")
+ transcription_fr = await client.transcription.transcribe(
+ self.config.audio_file_path,
+ translate_to_french=True,
+ version=version,
+ )
+ print_success(
+ f"French translation (async): "
+ f"'{ConsoleColor.YELLOW}{transcription_fr.text}{ConsoleColor.RESET}'"
+ )
+
+ self.test_results[test_name] = (
+ "Success",
+ f"v{version.value} completed",
+ )
+
+ except Exception as e:
+ print_error(f"Async transcription error (v{version.value}): {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ # ------------------------------
+ # Streaming Transcription
+ # ------------------------------
+
+ def test_streaming_transcription_sync(self) -> None:
+ test_name = "Sync Streaming Transcription"
+ print(
+ f"{ConsoleColor.CYAN}\n{'SYNCHRONOUS STREAMING TRANSCRIPTION':^60}{ConsoleColor.RESET}"
+ )
+
+ if not os.path.exists(self.config.audio_file_path):
+ print_error("Audio file missing")
+ self.test_results[test_name] = ("Failed", "Missing audio file")
+ return
+
+ for version in [Versions.v1, Versions.v2]:
+ try:
+ print_info(f"Testing streaming v{version.value}")
+ segment_count = 0
+
+ for segment in self.sync_client.transcription.transcribe(
+ self.config.audio_file_path, stream=True, version=version
+ ):
+ segment_count += 1
+ print_success(
+ f"Segment {segment_count}: "
+ f"{segment.start:.2f}s-{segment.end:.2f}s: "
+ f"'{ConsoleColor.YELLOW}{segment.text}{ConsoleColor.RESET}'"
+ )
+
+ if segment_count >= self.config.max_stream_segments:
+ print_info(
+ f"Showing first {self.config.max_stream_segments} segments"
+ )
+ break
+
+ print_success(f"Streaming complete: {segment_count} segments")
+
+ if version == Versions.v2:
+ print_info("Testing streaming French translation")
+ segment_count = 0
+
+ for segment in self.sync_client.transcription.transcribe(
+ self.config.audio_file_path,
+ stream=True,
+ translate_to_french=True,
+ version=version,
+ ):
+ segment_count += 1
+ text = segment.text
+ print_success(
+ f"French Segment {segment_count}: "
+ f"'{ConsoleColor.YELLOW}{text}{ConsoleColor.RESET}'"
+ )
+
+ if segment_count >= self.config.max_stream_segments:
+ print_info(
+ f"Showing first {self.config.max_stream_segments} segments"
+ )
+ break
+
+ print_success(
+ f"French streaming complete: {segment_count} segments"
+ )
+
+ self.test_results[test_name] = (
+ "Success",
+ f"v{version.value} completed",
+ )
+
+ except Exception as e:
+ print_error(f"Streaming error (v{version.value}): {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ async def test_streaming_transcription_async(self) -> None:
+ test_name = "Async Streaming Transcription"
+ print(
+ f"{ConsoleColor.PURPLE}\n{'ASYNCHRONOUS STREAMING TRANSCRIPTION':^60}{ConsoleColor.RESET}"
+ )
+
+ if not os.path.exists(self.config.audio_file_path):
+ print_error("Audio file missing")
+ self.test_results[test_name] = ("Failed", "Missing audio file")
+ return
+
+ async with self.async_client as client:
+ for version in [Versions.v1, Versions.v2]:
+ try:
+ print_info(f"Testing streaming v{version.value}")
+ segment_count = 0
+
+ generator = await client.transcription.transcribe(
+ self.config.audio_file_path, stream=True, version=version
+ )
+
+ async for segment in generator:
+ segment_count += 1
+ print_success(
+ f"Segment {segment_count}: "
+ f"{segment.start:.2f}s-{segment.end:.2f}s: "
+ f"'{ConsoleColor.YELLOW}{segment.text}{ConsoleColor.RESET}'"
+ )
+
+ if segment_count >= self.config.max_stream_segments:
+ print_info(
+ f"Showing first {self.config.max_stream_segments} segments"
+ )
+ break
+
+ print_success(f"Streaming complete: {segment_count} segments")
+
+ if version == Versions.v2:
+ print_info("Testing streaming French translation")
+ segment_count = 0
+
+ generator = await client.transcription.transcribe(
+ self.config.audio_file_path,
+ stream=True,
+ translate_to_french=True,
+ version=version,
+ )
+
+ async for segment in generator:
+ segment_count += 1
+ text = segment.text
+ print_success(
+ f"French Segment {segment_count}: "
+ f"'{ConsoleColor.YELLOW}{text}{ConsoleColor.RESET}'"
+ )
+
+ if segment_count >= self.config.max_stream_segments:
+ print_info(
+ f"Showing first {self.config.max_stream_segments} segments"
+ )
+ break
+
+ print_success(
+ f"French streaming complete: {segment_count} segments"
+ )
+
+ self.test_results[test_name] = (
+ "Success",
+ f"v{version.value} completed",
+ )
+
+ except Exception as e:
+ print_error(f"Async streaming error (v{version.value}): {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ # ------------------------------
+ # TTS Tests
+ # ------------------------------
+
+ def test_tts_sync(self) -> None:
+ test_name = "Sync TTS"
+ print(
+ f"{ConsoleColor.CYAN}\n{'SYNCHRONOUS TEXT-TO-SPEECH':^60}{ConsoleColor.RESET}"
+ )
+
+ try:
+ tts_request_v1 = TTSRequest(text=self.bambara_tts_text, speaker=1)
+ audio_file_v1 = self.sync_client.tts.text_to_speech(
+ request=tts_request_v1,
+ output_file=f"tts_sync_v1_{uuid4().hex}.wav",
+ version=Versions.v1,
+ )
+ print_success(
+ f"TTS v1 saved: {ConsoleColor.BLUE}{audio_file_v1}{ConsoleColor.RESET}"
+ )
+
+ for speaker in self.supported_speakers:
+ tts_request_v2 = TTSRequestV2(
+ text=self.bambara_tts_text,
+ description=f"{speaker} speaks with natural tone",
+ chunk_size=1.0,
+ )
+ audio_file_v2 = self.sync_client.tts.text_to_speech(
+ request=tts_request_v2,
+ output_file=f"tts_sync_v2_{speaker}_{uuid4().hex}.wav",
+ version=Versions.v2,
+ )
+ print_success(
+ f"TTS v2 ({speaker}): {ConsoleColor.BLUE}{audio_file_v2}{ConsoleColor.RESET}"
+ )
+
+ self.test_results[test_name] = (
+ "Success",
+ f"{len(self.supported_speakers)} speakers",
+ )
+
+ except Exception as e:
+ print_error(f"TTS error: {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ async def test_tts_async(self) -> None:
+ test_name = "Async TTS"
+ print(
+ f"{ConsoleColor.PURPLE}\n{'ASYNCHRONOUS TEXT-TO-SPEECH':^60}{ConsoleColor.RESET}"
+ )
+
+ async with self.async_client as client:
+ try:
+ tts_request_v1 = TTSRequest(text=self.bambara_tts_text, speaker=1)
+ audio_file_v1 = await client.tts.text_to_speech(
+ request=tts_request_v1,
+ output_file=f"tts_async_v1_{uuid4().hex}.wav",
+ version=Versions.v1,
+ )
+ print_success(
+ f"TTS v1 saved: {ConsoleColor.BLUE}{audio_file_v1}{ConsoleColor.RESET}"
+ )
+
+ for speaker in self.supported_speakers:
+ tts_request_v2 = TTSRequestV2(
+ text=self.bambara_tts_text,
+ description=f"{speaker} speaks with natural tone",
+ chunk_size=0.5,
+ )
+ audio_file_v2 = await client.tts.text_to_speech(
+ request=tts_request_v2,
+ output_file=f"tts_async_v2_{speaker}_{uuid4().hex}.wav",
+ version=Versions.v2,
+ )
+ print_success(
+ f"TTS v2 ({speaker}): {ConsoleColor.BLUE}{audio_file_v2}{ConsoleColor.RESET}"
+ )
+
+ self.test_results[test_name] = (
+ "Success",
+ f"{len(self.supported_speakers)} speakers",
+ )
+
+ except Exception as e:
+ print_error(f"Async TTS error: {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ # ------------------------------
+ # Streaming TTS
+ # ------------------------------
+
+ def test_streaming_tts_sync(self) -> None:
+ test_name = "Sync Streaming TTS"
+ print(
+ f"{ConsoleColor.CYAN}\n{'SYNCHRONOUS STREAMING TTS':^60}{ConsoleColor.RESET}"
+ )
+
+ try:
+ tts_request = TTSRequestV2(
+ text=self.bambara_tts_text,
+ description=f"{self.supported_speakers[0]} speaks with natural conversational tone",
+ )
+ chunk_count = 0
+ total_bytes = 0
+
+ for audio_chunk in self.sync_client.tts.text_to_speech(
+ request=tts_request,
+ output_file=f"stream_tts_sync_{uuid4().hex}.wav",
+ stream=True,
+ version=Versions.v2,
+ ):
+ chunk_count += 1
+ total_bytes += len(audio_chunk)
+ print_success(f"Chunk {chunk_count}: {len(audio_chunk):,} bytes")
+
+ if chunk_count >= self.config.max_stream_chunks:
+ print_info(f"Showing first {self.config.max_stream_chunks} chunks")
+ break
+
+ print_success(
+ f"Streaming complete: {chunk_count} chunks, "
+ f"{ConsoleColor.BLUE}{total_bytes:,} bytes{ConsoleColor.RESET}"
+ )
+ self.test_results[test_name] = ("Success", f"{chunk_count} chunks")
+
+ except Exception as e:
+ print_error(f"Streaming TTS error: {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ async def test_streaming_tts_async(self) -> None:
+ test_name = "Async Streaming TTS"
+ print(
+ f"{ConsoleColor.PURPLE}\n{'ASYNCHRONOUS STREAMING TTS':^60}{ConsoleColor.RESET}"
+ )
+
+ async with self.async_client as client:
+ try:
+ tts_request = TTSRequestV2(
+ text=self.bambara_tts_text,
+ description=f"{self.supported_speakers[0]} speaks with clear natural tone",
+ )
+ chunk_count = 0
+ total_bytes = 0
+
+ generator = await client.tts.text_to_speech(
+ request=tts_request,
+ output_file=f"stream_tts_async_{uuid4().hex}.wav",
+ stream=True,
+ version=Versions.v2,
+ )
+
+ async for audio_chunk in generator:
+ chunk_count += 1
+ total_bytes += len(audio_chunk)
+ print_success(f"Chunk {chunk_count}: {len(audio_chunk):,} bytes")
+
+ if chunk_count >= self.config.max_stream_chunks:
+ print_info(
+ f"Showing first {self.config.max_stream_chunks} chunks"
+ )
+ break
+
+ print_success(
+ f"Streaming complete: {chunk_count} chunks, "
+ f"{ConsoleColor.BLUE}{total_bytes:,} bytes{ConsoleColor.RESET}"
+ )
+ self.test_results[test_name] = ("Success", f"{chunk_count} chunks")
+
+ except Exception as e:
+ print_error(f"Async streaming TTS error: {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ # ------------------------------
+ # Advanced Features
+ # ------------------------------
+
+ async def test_parallel_operations(self) -> None:
+ test_name = "Parallel Operations"
+ print(
+ f"{ConsoleColor.CYAN}\n{'PARALLEL API OPERATIONS':^60}{ConsoleColor.RESET}"
+ )
+
+ async with self.async_client as client:
+ try:
+ print_info("Executing parallel operations...")
+
+ translation_request = TranslationRequest(
+ text="Hello", source=Language.ENGLISH, target=Language.BAMBARA
+ )
+
+ tts_request = TTSRequestV2(
+ text=self.bambara_tts_text,
+ description=f"{self.supported_speakers[0]} speaks with clear speaking tone",
+ )
+
+ results = await asyncio.gather(
+ client.translation.get_supported_languages(),
+ client.translation.translate(
+ translation_request, version=Versions.v1
+ ),
+ client.transcription.transcribe(
+ self.config.audio_file_path, version=Versions.v1
+ ),
+ client.tts.text_to_speech(
+ tts_request,
+ output_file=f"parallel_tts_{uuid4().hex}.wav",
+ version=Versions.v2,
+ ),
+ return_exceptions=True,
+ )
+
+ print(f"\n{ConsoleColor.CYAN}Parallel Results:{ConsoleColor.RESET}")
+ process_result("Languages", results[0])
+ process_result("Translation", results[1])
+ process_result("Transcription", results[2])
+ process_result("TTS Output", results[3])
+
+ if all(not isinstance(r, Exception) for r in results):
+ self.test_results[test_name] = (
+ "Success",
+ "All operations completed",
+ )
+ else:
+ failed = [
+ type(r).__name__ for r in results if isinstance(r, Exception)
+ ]
+ self.test_results[test_name] = (
+ "Failed",
+ f"Errors: {', '.join(failed)}",
+ )
+
+ except Exception as e:
+ print_error(f"Parallel operations error: {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ def test_version_management(self) -> None:
+ test_name = "Version Management"
+ print(f"{ConsoleColor.CYAN}\n{'VERSION MANAGEMENT':^60}{ConsoleColor.RESET}")
+
+ print_success(
+ f"Latest version: {ConsoleColor.YELLOW}{Versions.latest()}{ConsoleColor.RESET}"
+ )
+ print_success(
+ f"Available versions: {ConsoleColor.GRAY}{[str(v) for v in Versions.all_versions()]}"
+ )
+
+ try:
+ request = TranslationRequest(
+ text="Hello world", source=Language.ENGLISH, target=Language.BAMBARA
+ )
+ result = self.sync_client.translation.translate(
+ request=request, version=Versions.v1
+ )
+ print_success(
+ f"Translation v1: '{ConsoleColor.YELLOW}{result.text}{ConsoleColor.RESET}'"
+ )
+
+ for version in [Versions.v1, Versions.v2]:
+ transcription = self.sync_client.transcription.transcribe(
+ self.config.audio_file_path, version=version
+ )
+ handle_transcription_result(transcription, f"v{version.value}")
+
+ tts_v1 = self.sync_client.tts.text_to_speech(
+ request=TTSRequest(text=self.bambara_tts_text, speaker=1),
+ output_file=f"tts_v1_{uuid4().hex}.wav",
+ version=Versions.v1,
+ )
+ print_success(f"TTS v1: {ConsoleColor.BLUE}{tts_v1}{ConsoleColor.RESET}")
+
+ tts_v2 = self.sync_client.tts.text_to_speech(
+ request=TTSRequestV2(
+ text=self.bambara_tts_text,
+ description=f"{self.supported_speakers[0]} speaks with natural tone",
+ ),
+ output_file=f"tts_v2_{uuid4().hex}.wav",
+ version=Versions.v2,
+ )
+ print_success(f"TTS v2: {ConsoleColor.BLUE}{tts_v2}{ConsoleColor.RESET}")
+
+ self.test_results[test_name] = ("Success", "All version tests completed")
+
+ except Exception as e:
+ print_error(f"Version test error: {e}")
+ self.test_results[test_name] = ("Failed", str(e))
+ logging.error(f"Traceback:\n{traceback.format_exc()}")
+
+ # ------------------------------
+ # Main Execution
+ # ------------------------------
+ def run(self) -> None:
+ print(f"\n{ConsoleColor.CYAN}{'=' * 60}{ConsoleColor.RESET}")
+ print(
+ f"{ConsoleColor.YELLOW}{'DJELIA SDK DEVELOPER COOKBOOK':^60}{ConsoleColor.RESET}"
+ )
+ print(f"{ConsoleColor.CYAN}{'=' * 60}{ConsoleColor.RESET}")
+
+ logging.basicConfig(
+ filename="djelia_cookbook.log",
+ level=logging.ERROR,
+ format="%(asctime)s - %(levelname)s - %(message)s",
+ )
+
+ if not self.validate_setup():
+ return
+
+ try:
+ self.test_translation_sync()
+ self.test_transcription_sync()
+ self.test_tts_sync()
+ self.test_streaming_transcription_sync()
+ self.test_streaming_tts_sync()
+ self.test_version_management()
+
+ asyncio.run(self.test_translation_async())
+ asyncio.run(self.test_transcription_async())
+ asyncio.run(self.test_tts_async())
+ asyncio.run(self.test_streaming_transcription_async())
+ asyncio.run(self.test_streaming_tts_async())
+ asyncio.run(self.test_parallel_operations())
+
+ print_summary(self.test_results)
+
+ except KeyboardInterrupt:
+ print_error("Execution interrupted by user")
+ self.test_results["Overall"] = ("Failed", "Interrupted")
+ print_summary(self.test_results)
+ except Exception as e:
+ print_error(f"Unexpected error: {str(e)}")
+ logging.error(f"Unhandled exception:\n{traceback.format_exc()}")
+ self.test_results["Overall"] = ("Failed", "Runtime error")
+ print_summary(self.test_results)
diff --git a/cookbook/env.example b/cookbook/env.example
new file mode 100644
index 0000000..49a08fb
--- /dev/null
+++ b/cookbook/env.example
@@ -0,0 +1,2 @@
+DJELIA_API_KEY=your_api_key_here
+TEST_AUDIO_FILE="Your_test_audio_path" # this is optional hein
\ No newline at end of file
diff --git a/cookbook/main.py b/cookbook/main.py
new file mode 100644
index 0000000..1c4bf27
--- /dev/null
+++ b/cookbook/main.py
@@ -0,0 +1,19 @@
+import os
+import sys
+
+sys.path.append(
+ os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+) # not really necessary but I will keep it, just au cas ou
+
+from .config import Config
+from .cookbook import DjeliaCookbook
+
+
+def main():
+ config = Config.load()
+ cookbook = DjeliaCookbook(config)
+ cookbook.run()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/cookbook/utils.py b/cookbook/utils.py
new file mode 100644
index 0000000..32a06d1
--- /dev/null
+++ b/cookbook/utils.py
@@ -0,0 +1,76 @@
+import logging
+from typing import List, Union
+
+from djelia.models import FrenchTranscriptionResponse, TranscriptionSegment
+
+# ================================================
+# Console Utilities
+# ================================================
+
+
+class ConsoleColor:
+ GREEN = "\033[32m"
+ RED = "\033[31m"
+ YELLOW = "\033[33m"
+ CYAN = "\033[36m"
+ PURPLE = "\033[35m"
+ BLUE = "\033[34m"
+ GRAY = "\033[37m"
+ RESET = "\033[0m"
+
+
+# ================================================
+# Utility Functions
+# ================================================
+
+
+def print_success(message: str) -> None:
+ print(f"{ConsoleColor.GREEN}✓ {message}{ConsoleColor.RESET}")
+
+
+def print_error(message: str) -> None:
+ print(f"{ConsoleColor.RED}✗ {message}{ConsoleColor.RESET}")
+ logging.error(message)
+
+
+def print_info(message: str) -> None:
+ print(f"{ConsoleColor.GRAY}ℹ {message}{ConsoleColor.RESET}")
+
+
+def print_summary(test_results: dict) -> None:
+ print(f"\n{ConsoleColor.CYAN}{'=' * 60}{ConsoleColor.RESET}")
+ print(f"{'Test Summary':^60}")
+ print(f"{ConsoleColor.CYAN}{'=' * 60}{ConsoleColor.RESET}")
+ print(f"{'Test':<40} {'Status':<10} {'Details'}")
+ print(f"{ConsoleColor.GRAY}{'-' * 60}{ConsoleColor.RESET}")
+
+ for test, (status, details) in sorted(test_results.items()):
+ color = ConsoleColor.GREEN if status == "Success" else ConsoleColor.RED
+ print(f"{test:<40} {color}{status:<10}{ConsoleColor.RESET} {details}")
+
+
+def handle_transcription_result(
+ transcription: Union[List[TranscriptionSegment], FrenchTranscriptionResponse],
+ version_info: str,
+) -> None:
+ if isinstance(transcription, list) and transcription:
+ print_success(f"Transcription {version_info}: {len(transcription)} segments")
+ segment = transcription[0]
+ print_success(
+ f"Sample: {segment.start:.1f}s-{segment.end:.1f}s: "
+ f"'{ConsoleColor.YELLOW}{segment.text}{ConsoleColor.RESET}'"
+ )
+ elif hasattr(transcription, "text"):
+ print_success(
+ f"Transcription {version_info}: "
+ f"'{ConsoleColor.YELLOW}{transcription.text}{ConsoleColor.RESET}'"
+ )
+ else:
+ print_error(f"Unexpected result format for {version_info}")
+
+
+def process_result(name: str, result: object) -> None:
+ if isinstance(result, Exception):
+ print_error(f"{name}: {str(result)}")
+ else:
+ print_success(f"{name}: Received {type(result).__name__}")
diff --git a/dev-requirements.txt b/dev-requirements.txt
new file mode 100644
index 0000000..590b965
--- /dev/null
+++ b/dev-requirements.txt
@@ -0,0 +1,4 @@
+ruff==0.11.4
+isort==6.0.1
+pre-commit==4.2.0
+# black==25.1.0
diff --git a/djelia/__init__.py b/djelia/__init__.py
index a2dee8d..07e0bbc 100644
--- a/djelia/__init__.py
+++ b/djelia/__init__.py
@@ -1,34 +1,3 @@
-from .client import Djelia, AsyncDjelia
-from .constants import djelia_config
+from djelia.src.client import Djelia, DjeliaAsync
-
-SUPPORTED_LANGUAGES = djelia_config.SUPPORTED_LANGUAGES
-VALID_SPEAKER_IDS = djelia_config.VALID_SPEAKER_IDS
-
-from .exceptions import (
- DjeliaError,
- AuthenticationError,
- ValidationError,
- APIError,
- LanguageError,
- SpeakerError,
- AudioFormatError
-)
-
-__version__ = "0.2.0"
-
-__all__ = [
- 'Djelia',
- 'AsyncDjelia',
- 'SUPPORTED_LANGUAGES',
- 'VALID_SPEAKER_IDS'
-]
-
- # let's close those component
- # 'DjeliaError',
- # 'AuthenticationError',
- # 'ValidationError',
- # 'APIError',
- # 'LanguageError',
- # 'SpeakerError',
- # 'AudioFormatError'
\ No newline at end of file
+__all__ = ["Djelia", "DjeliaAsync"]
diff --git a/djelia/client.py b/djelia/client.py
deleted file mode 100644
index 0a54b10..0000000
--- a/djelia/client.py
+++ /dev/null
@@ -1,361 +0,0 @@
-import os
-import json
-import requests
-from typing import List, Dict, Union, Optional, BinaryIO, Generator, AsyncGenerator
-
-import aiohttp
-from .constants import djelia_config
-
-from .exceptions import (
- DjeliaError,
- AuthenticationError,
- ValidationError,
- APIError,
- LanguageError,
- SpeakerError,
- AudioFormatError
-)
-
-
-class Djelia:
- def __init__(self, api_key: str = None):
- self.api_key = api_key or os.environ.get(djelia_config.ENV_API_KEY)
- if not self.api_key:
- raise AuthenticationError(
- f"API key is required. Provide it as an argument or set the {djelia_config.ENV_API_KEY} environment variable."
- )
-
- self.headers = {
- djelia_config.API_KEY_HEADER: self.api_key
- }
-
-
- def _handle_response_error(self, response: requests.Response) -> None:
- if response.status_code == 401:
- raise AuthenticationError("Invalid API key or unauthorized access")
-
- elif response.status_code == 422:
- try:
- error_detail = response.json().get("detail", "Validation failed")
- raise ValidationError(f"Validation error: {error_detail}")
- except (json.JSONDecodeError, AttributeError):
- raise ValidationError("Validation error")
-
- elif response.status_code != 200:
- try:
- error_msg = response.json().get("detail", response.text or "Unknown error")
- except (json.JSONDecodeError, AttributeError):
- error_msg = response.text or "Unknown error"
- raise APIError(response.status_code, error_msg)
-
-
- def get_supported_languages(self, version: int = 1) -> List[Dict[str, str]]:
- if version not in djelia_config.MODELS_VERSION.supported_languages:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.supported_languages}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.supported_languages.get(version)}"
- response = requests.get(url, headers=self.headers)
-
- if response.status_code != 200:
- self._handle_response_error(response)
-
- return response.json()
-
-
- def translate(self, text: str, source: str, target: str, version: int = 1) -> Dict[str, str]:
- if version not in djelia_config.MODELS_VERSION.translate:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.translate}")
-
- if source not in djelia_config.SUPPORTED_LANGUAGES:
- raise LanguageError(f"Source language '{source}' not supported. Must be one of {djelia_config.SUPPORTED_LANGUAGES.keys()}")
-
- if target not in djelia_config.SUPPORTED_LANGUAGES:
- raise LanguageError(f"Target language '{target}' not supported. Must be one of {djelia_config.SUPPORTED_LANGUAGES.keys()}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.translate.get(version)}"
- data = {
- "text": text,
- "source": djelia_config.SUPPORTED_LANGUAGES.get(source),
- "target": djelia_config.SUPPORTED_LANGUAGES.get(target)
- }
- headers = {**self.headers, "Content-Type": "application/json"}
- response = requests.post(url, headers=headers, json=data)
-
- if response.status_code != 200:
- self._handle_response_error(response)
-
- return response.json()
-
-
- def transcribe(self, audio_file: Union[str, BinaryIO], translate_to_french: bool = False, version: int = 2) -> Union[List[Dict], Dict]:
- if version not in djelia_config.MODELS_VERSION.transcription:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.transcription}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.transcribe.get(version)}"
- params = {"translate_to_french": translate_to_french}
-
- try:
- if isinstance(audio_file, str):
- with open(audio_file, 'rb') as f:
- files = {"file": f}
- response = requests.post(url, headers=self.headers, params=params, files=files)
- else:
- files = {"file": audio_file}
- response = requests.post(url, headers=self.headers, params=params, files=files)
- except IOError as e:
- raise IOError(f"Could not read audio file: {str(e)}")
-
- if response.status_code != 200:
- self._handle_response_error(response)
-
- return response.json()
-
-
- def stream_transcribe(self, audio_file: Union[str, BinaryIO], translate_to_french: bool = False, version: int = 1) -> Generator[Dict, None, None]:
- if version not in djelia_config.MODELS_VERSION.transcribe_stream:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.transcribe_stream}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.transcribe_stream.get(version)}"
- params = {"translate_to_french": translate_to_french}
-
- try:
- if isinstance(audio_file, str):
- with open(audio_file, 'rb') as f:
- files = {"file": f}
- response = requests.post(url, headers=self.headers, params=params, files=files, stream=True)
- else:
- files = {"file": audio_file}
- response = requests.post(url, headers=self.headers, params=params, files=files, stream=True)
- except IOError as e:
- raise IOError(f"Could not read audio file: {str(e)}")
-
- if response.status_code != 200:
- self._handle_response_error(response)
-
- for line in response.iter_lines():
- if line:
- try:
- yield json.loads(line.decode('utf-8'))
- except json.JSONDecodeError:
- continue
-
-
- def text_to_speech(self, text: str, speaker: int = djelia_config.DEFAULT_SPEAKER_ID, output_file: Optional[str] = None, version: int = 1) -> Union[bytes, str]:
- if version not in djelia_config.MODELS_VERSION.text_to_speech:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.text_to_speech}")
-
- if speaker not in djelia_config.VALID_SPEAKER_IDS:
- raise SpeakerError(f"Speaker ID must be one of {djelia_config.VALID_SPEAKER_IDS}, got {speaker}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.text_to_speech.get(version)}"
- data = {
- "text": text,
- "speaker": speaker
- }
- headers = {**self.headers, "Content-Type": "application/json"}
- response = requests.post(url, headers=headers, json=data)
-
- if response.status_code != 200:
- self._handle_response_error(response)
-
- if output_file:
- try:
- with open(output_file, 'wb') as f:
- f.write(response.content)
- return output_file
- except IOError as e:
- raise IOError(f"Failed to save audio file: {str(e)}")
- else:
- return response.content
-
-
-
-class AsyncDjelia:
- def __init__(self, api_key: str = None):
- self.api_key = api_key or os.environ.get(djelia_config.ENV_API_KEY)
- if not self.api_key:
- raise AuthenticationError(
- f"API key is required. Provide it as an argument or set the {djelia_config.ENV_API_KEY} environment variable."
- )
-
- self.headers = {
- djelia_config.API_KEY_HEADER: self.api_key
- }
- self._session = None
-
- @property
- def session(self):
- if self._session is None or self._session.closed:
- self._session = aiohttp.ClientSession()
- return self._session
-
- async def close(self):
- if self._session and not self._session.closed:
- await self._session.close()
- self._session = None
-
- async def __aenter__(self):
- return self
-
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.close()
-
- async def _handle_response_error(self, response: aiohttp.ClientResponse) -> None:
- if response.status == 401:
- raise AuthenticationError("Invalid API key or unauthorized access")
- elif response.status == 422:
- try:
- error_data = await response.json()
- error_detail = error_data.get("detail", "Validation failed")
- raise ValidationError(f"Validation error: {error_detail}")
- except (json.JSONDecodeError, aiohttp.ContentTypeError):
- raise ValidationError("Validation error")
- elif response.status != 200:
- try:
- error_data = await response.json()
- error_msg = error_data.get("detail", await response.text() or "Unknown error")
- except (json.JSONDecodeError, aiohttp.ContentTypeError):
- error_msg = await response.text() or "Unknown error"
- raise APIError(response.status, error_msg)
-
-
- async def get_supported_languages(self, version: int = 1) -> List[Dict[str, str]]:
- if version not in djelia_config.MODELS_VERSION.supported_languages:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.supported_languages}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.supported_languages.get(version)}"
-
- async with self.session.get(url, headers=self.headers) as response:
- if response.status != 200:
- await self._handle_response_error(response)
-
- return await response.json()
-
-
- async def translate(self, text: str, source: str, target: str, version: int = 1) -> Dict[str, str]:
- if version not in djelia_config.MODELS_VERSION.translate:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.translate}")
-
- if source not in djelia_config.SUPPORTED_LANGUAGES:
- raise LanguageError(f"Source language '{source}' not supported. Must be one of {djelia_config.SUPPORTED_LANGUAGES.keys()}")
- if target not in djelia_config.SUPPORTED_LANGUAGES:
- raise LanguageError(f"Target language '{target}' not supported. Must be one of {djelia_config.SUPPORTED_LANGUAGES.keys()}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.translate.get(version)}"
- data = {
- "text": text,
- "source": djelia_config.SUPPORTED_LANGUAGES.get(source),
- "target": djelia_config.SUPPORTED_LANGUAGES.get(target)
- }
- headers = {**self.headers, "Content-Type": "application/json"}
-
- async with self.session.post(url, headers=headers, json=data) as response:
- if response.status != 200:
- await self._handle_response_error(response)
-
- return await response.json()
-
-
- async def transcribe(self, audio_file: Union[str, BinaryIO], translate_to_french: bool = False, version: int = 1) -> Union[List[Dict], Dict]:
- if version not in djelia_config.MODELS_VERSION.transcription:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.transcription}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.transcribe.get(version)}"
- params = {"translate_to_french": str(translate_to_french).lower() }
-
- try:
- if isinstance(audio_file, str):
- with open(audio_file, 'rb') as f:
- data = aiohttp.FormData()
- data.add_field('file', f.read(), filename=os.path.basename(audio_file))
-
- async with self.session.post(url, headers=self.headers, params=params, data=data) as response:
- if response.status != 200:
- await self._handle_response_error(response)
-
- return await response.json()
- else:
- data = aiohttp.FormData()
- data.add_field('file', audio_file.read(), filename='audio_file')
-
- async with self.session.post(url, headers=self.headers, params=params, data=data) as response:
- if response.status != 200:
- await self._handle_response_error(response)
-
- return await response.json()
- except IOError as e:
- raise IOError(f"Could not read audio file: {str(e)}")
-
-
- async def stream_transcribe(self, audio_file: Union[str, BinaryIO], translate_to_french: bool = False, version: int = 1) -> AsyncGenerator[Dict, None]:
- if version not in djelia_config.MODELS_VERSION.transcribe_stream:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.transcribe_stream}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.transcribe_stream.get(version)}"
- params = {"translate_to_french": str(translate_to_french).lower() }
-
- try:
- if isinstance(audio_file, str):
- with open(audio_file, 'rb') as f:
- data = aiohttp.FormData()
- data.add_field('file', f.read(), filename=os.path.basename(audio_file))
-
- async with self.session.post(url, headers=self.headers, params=params, data=data) as response:
- if response.status != 200:
- await self._handle_response_error(response)
-
- async for line in response.content:
- line = line.strip()
- if line:
- try:
- yield json.loads(line.decode('utf-8'))
- except json.JSONDecodeError:
- continue
- else:
- data = aiohttp.FormData()
- data.add_field('file', audio_file.read(), filename='audio_file')
-
- async with self.session.post(url, headers=self.headers, params=params, data=data) as response:
- if response.status != 200:
- await self._handle_response_error(response)
-
- async for line in response.content:
- line = line.strip()
- if line:
- try:
- yield json.loads(line.decode('utf-8'))
- except json.JSONDecodeError:
- continue
- except IOError as e:
- raise IOError(f"Could not read audio file: {str(e)}")
-
-
- async def text_to_speech(self, text: str, speaker: int = djelia_config.DEFAULT_SPEAKER_ID, output_file: Optional[str] = None, version: int = 1) -> Union[bytes, str]:
- if version not in djelia_config.MODELS_VERSION.text_to_speech:
- raise ValidationError(f"Version must be one of {djelia_config.MODELS_VERSION.text_to_speech}")
-
- if speaker not in djelia_config.VALID_SPEAKER_IDS:
- raise SpeakerError(f"Speaker ID must be one of {djelia_config.VALID_SPEAKER_IDS}, got {speaker}")
-
- url = f"{djelia_config.BASE_URL}{djelia_config.ENDPOINTS.text_to_speech.get(version)}"
- data = {
- "text": text,
- "speaker": speaker
- }
- headers = {**self.headers, "Content-Type": "application/json"}
-
- async with self.session.post(url, headers=headers, json=data) as response:
- if response.status != 200:
- await self._handle_response_error(response)
-
- content = await response.read()
-
- if output_file:
- try:
- with open(output_file, 'wb') as f:
- f.write(content)
- return output_file
- except IOError as e:
- raise IOError(f"Failed to save audio file: {str(e)}")
- else:
- return content
\ No newline at end of file
diff --git a/djelia/config/__init__.py b/djelia/config/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/djelia/config/settings.py b/djelia/config/settings.py
new file mode 100644
index 0000000..6339cff
--- /dev/null
+++ b/djelia/config/settings.py
@@ -0,0 +1,8 @@
+BASE_URL = "https://djelia.cloud"
+API_KEY_HEADER = "x-api-key"
+ENV_API_KEY = "DJELIA_API_KEY"
+
+VALID_SPEAKER_IDS = [0, 1, 2, 3, 4]
+DEFAULT_SPEAKER_ID = 1
+
+VALID_TTS_V2_SPEAKERS = ["Moussa", "Sekou", "Seydou"]
diff --git a/djelia/constants.py b/djelia/constants.py
deleted file mode 100644
index 186fc8b..0000000
--- a/djelia/constants.py
+++ /dev/null
@@ -1,69 +0,0 @@
-from dataclasses import dataclass
-from typing import Dict, List
-
-@dataclass(frozen=True)
-class EndpointConfig:
- translate: Dict[int, str]
- supported_languages: Dict[int, str]
- transcribe: Dict[int, str]
- transcribe_stream: Dict[int, str]
- text_to_speech: Dict[int, str]
-
-
-@dataclass(frozen=True)
-class ModelsVersionConfig:
- transcription: List[int]
- translate: List[int]
- supported_languages: List[int]
- transcribe_stream: List[int]
- text_to_speech: List[int]
-
-
-@dataclass(frozen=True)
-class DjeliaApiConfig:
- BASE_URL: str
- API_KEY_HEADER: str
- ENV_API_KEY: str
- ENDPOINTS: EndpointConfig
- SUPPORTED_LANGUAGES: Dict[str, str]
- VALID_SPEAKER_IDS: List[int]
- DEFAULT_SPEAKER_ID: int
- MODELS_VERSION: ModelsVersionConfig
-
-
-endpoints = EndpointConfig(
- translate={1: "/api/v1/models/translate"},
- supported_languages={1: "/api/v1/models/translate/supported-languages"},
- transcribe={
- 1: "/api/v1/models/transcribe",
- 2: "/api/v2/models/transcribe"
- },
- transcribe_stream={
- 1: "/api/v1/models/transcribe/stream",
- 2: "/api/v2/models/transcribe/stream"
- },
- text_to_speech={1: "/api/v1/models/tts"}
-)
-
-models_version = ModelsVersionConfig(
- transcription=[1, 2],
- translate=[1],
- supported_languages=[1],
- transcribe_stream=[1, 2],
- text_to_speech=[1]
-)
-
-djelia_config = DjeliaApiConfig(
- BASE_URL="https://djelia.cloud",
- API_KEY_HEADER="x-api-key",
- ENV_API_KEY="DJELIA_API_KEY",
- ENDPOINTS=endpoints,
- SUPPORTED_LANGUAGES={
- "fr": "fra_Latn",
- "en": "eng_Latn",
- "bam": "bam_Latn",
- },
- VALID_SPEAKER_IDS=[0, 1, 2, 3, 4],
- DEFAULT_SPEAKER_ID=1,
- MODELS_VERSION=models_version
-)
\ No newline at end of file
diff --git a/djelia/models/__init__.py b/djelia/models/__init__.py
new file mode 100644
index 0000000..bb8d525
--- /dev/null
+++ b/djelia/models/__init__.py
@@ -0,0 +1,23 @@
+from .models import ErrorsMessage # TranscriptionRequest,
+from .models import (DjeliaRequest, FrenchTranscriptionResponse,
+ HttpRequestInfo, Language, Params,
+ SupportedLanguageSchema, TranscriptionSegment,
+ TranslationRequest, TranslationResponse, TTSRequest,
+ TTSRequestV2, Versions)
+
+__all__ = [
+ "Language",
+ "DjeliaRequest",
+ "HttpRequestInfo",
+ "TranscriptionRequest",
+ "TranslationRequest",
+ "TTSRequest",
+ "SupportedLanguageSchema",
+ "TranscriptionSegment",
+ "TranslationResponse",
+ "FrenchTranscriptionResponse",
+ "Params",
+ "ErrorsMessage",
+ "Versions",
+ "TTSRequestV2",
+]
diff --git a/djelia/models/models.py b/djelia/models/models.py
new file mode 100644
index 0000000..c5ec5f0
--- /dev/null
+++ b/djelia/models/models.py
@@ -0,0 +1,119 @@
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional
+
+from pydantic import BaseModel, Field
+
+from djelia.config.settings import BASE_URL, ENV_API_KEY
+
+
+class Language(str, Enum):
+ FRENCH = "fra_Latn"
+ ENGLISH = "eng_Latn"
+ BAMBARA = "bam_Latn"
+
+
+class Versions(int, Enum):
+ v1 = 1
+ v2 = 2
+
+ @classmethod
+ def latest(cls):
+ return max(cls, key=lambda x: x.value)
+
+ @classmethod
+ def all_versions(cls):
+ return list(cls)
+
+ def __str__(self):
+ return f"v{self.value}"
+
+
+@dataclass
+class HttpRequestInfo:
+ endpoint: str
+ method: str
+
+
+class DjeliaRequest:
+ endpoint_prefix = f"{BASE_URL}/api/v{{}}/models/"
+
+ get_supported_languages: HttpRequestInfo = HttpRequestInfo(
+ endpoint=endpoint_prefix + "translate/supported-languages", method="GET"
+ )
+ translate: HttpRequestInfo = HttpRequestInfo(
+ endpoint=endpoint_prefix + "translate", method="POST"
+ )
+
+ transcribe: HttpRequestInfo = HttpRequestInfo(
+ endpoint=endpoint_prefix + "transcribe", method="POST"
+ )
+
+ transcribe_stream: HttpRequestInfo = HttpRequestInfo(
+ endpoint=endpoint_prefix + "transcribe/stream", method="POST"
+ )
+
+ tts: HttpRequestInfo = HttpRequestInfo(
+ endpoint=endpoint_prefix + "tts", method="POST"
+ )
+
+ tts_stream: HttpRequestInfo = HttpRequestInfo(
+ endpoint=endpoint_prefix + "tts/stream", method="POST"
+ )
+
+
+class TranslationRequest(BaseModel):
+ text: str
+ source: Language
+ target: Language
+
+
+class TTSRequest(BaseModel):
+ text: str
+ speaker: Optional[int] = 1
+
+
+class TTSRequestV2(BaseModel):
+ text: str = Field(..., max_length=1000)
+ description: str
+ chunk_size: Optional[float] = Field(default=1.0, ge=0.1, le=2.0)
+
+
+class SupportedLanguageSchema(BaseModel):
+ code: str
+ name: str
+
+
+class TranslationResponse(BaseModel):
+ text: str
+
+
+class TranscriptionSegment(BaseModel):
+ text: str
+ start: float
+ end: float
+
+
+class FrenchTranscriptionResponse(BaseModel):
+ text: str
+
+
+class Params:
+ file: str = "file"
+ translate_to_french: str = "translate_to_french"
+ filename: str = "audio_file"
+
+
+class ErrorsMessage:
+ ioerror_save: str = "Failed to save audio file:\n Exception {}"
+ ioerror_read: str = "Could not read audio file:\n Exception {}"
+ speaker_description_error: str = (
+ "Description must contain one of the supported speakers: {}"
+ )
+ speaker_id_error: str = "Speaker ID must be one of {}, got {}"
+ api_key_missing: str = (
+ f"API key must be provided via parameter or {ENV_API_KEY} environment variable"
+ )
+ tts_v1_request_error: str = "TTSRequest required for V1"
+ tts_v2_request_error: str = "TTSRequestV2 required for V2"
+ tts_streaming_compatibility: str = "Streaming is only available for TTS V2"
diff --git a/djelia/src/__init__.py b/djelia/src/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/djelia/src/auth/__init__.py b/djelia/src/auth/__init__.py
new file mode 100644
index 0000000..188cd9d
--- /dev/null
+++ b/djelia/src/auth/__init__.py
@@ -0,0 +1,3 @@
+from .auth import Auth
+
+__all__ = ["Auth"]
diff --git a/djelia/src/auth/auth.py b/djelia/src/auth/auth.py
new file mode 100644
index 0000000..864055d
--- /dev/null
+++ b/djelia/src/auth/auth.py
@@ -0,0 +1,15 @@
+# from djelia.config.settings import API_KEY_HEADER, ENV_API_KEY
+import os
+
+from djelia.models.models import API_KEY_HEADER, ENV_API_KEY, ErrorsMessage
+
+
+class Auth:
+ def __init__(self, api_key: str = None):
+ self.api_key = api_key or os.environ.get(ENV_API_KEY)
+
+ if not self.api_key:
+ raise ValueError(ErrorsMessage.api_key_missing)
+
+ def get_headers(self):
+ return {API_KEY_HEADER: self.api_key}
diff --git a/djelia/src/client/__init__.py b/djelia/src/client/__init__.py
new file mode 100644
index 0000000..f3ab24f
--- /dev/null
+++ b/djelia/src/client/__init__.py
@@ -0,0 +1,3 @@
+from .client import Djelia, DjeliaAsync
+
+__all__ = ["Djelia", "DjeliaAsync"]
diff --git a/djelia/src/client/client.py b/djelia/src/client/client.py
new file mode 100644
index 0000000..e02344e
--- /dev/null
+++ b/djelia/src/client/client.py
@@ -0,0 +1,119 @@
+import aiohttp
+import requests
+from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
+ wait_random_exponential)
+
+from djelia.src.auth import Auth
+from djelia.src.services import (TTS, AsyncTranscription, AsyncTranslation,
+ AsyncTTS, Transcription, Translation)
+from djelia.utils.errors import api_exception, general_exception
+
+
+class Djelia:
+ def __init__(self, api_key: str = None):
+ self.auth = Auth(api_key)
+ self.translation = Translation(self)
+ self.transcription = Transcription(self)
+ self.tts = TTS(self)
+
+ @retry(
+ retry=retry_if_exception_type(Exception),
+ wait=wait_random_exponential(multiplier=1, max=40),
+ stop=stop_after_attempt(3),
+ )
+ def _make_request(self, method: str, endpoint: str, **kwargs):
+ headers = self.auth.get_headers()
+
+ if "params" in kwargs:
+ params = kwargs["params"]
+ for key, value in params.items():
+ if isinstance(value, bool):
+ params[key] = str(value).lower()
+
+ try:
+ response = requests.request(method, endpoint, headers=headers, **kwargs)
+ response.raise_for_status()
+ return response
+ except requests.exceptions.HTTPError as e:
+ raise api_exception(code=e.response.status_code, error=e)
+ except requests.exceptions.RequestException as e:
+ raise general_exception(error=e)
+
+
+class DjeliaAsync:
+ def __init__(self, api_key: str = None):
+ self.auth = Auth(api_key)
+ self.translation = AsyncTranslation(self)
+ self.transcription = AsyncTranscription(self)
+ self.tts = AsyncTTS(self)
+ self._session = None
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ await self.close()
+
+ async def close(self):
+ if self._session and not self._session.closed:
+ await self._session.close()
+ self._session = None
+
+ @property
+ def session(self):
+ if self._session is None or self._session.closed:
+ self._session = aiohttp.ClientSession()
+ return self._session
+
+ @retry(
+ retry=retry_if_exception_type(Exception),
+ wait=wait_random_exponential(multiplier=1, max=40),
+ stop=stop_after_attempt(3),
+ )
+ async def _make_request(self, method: str, endpoint: str, **kwargs):
+ headers = self.auth.get_headers()
+
+ if "params" in kwargs:
+ params = kwargs["params"]
+ for key, value in params.items():
+ if isinstance(value, bool):
+ params[key] = str(value).lower()
+
+ async with self.session.request(
+ method, endpoint, headers=headers, **kwargs
+ ) as response:
+ try:
+ response.raise_for_status()
+ content_type = response.headers.get("content-type", "").lower()
+
+ if "application/json" in content_type:
+ return await response.json()
+ else:
+ return await response.read()
+
+ except aiohttp.ClientResponseError as e:
+ raise api_exception(code=e.status, error=e)
+ except aiohttp.ClientError as e:
+ raise general_exception(error=e)
+
+ async def _make_streaming_request(self, method: str, endpoint: str, **kwargs):
+ headers = self.auth.get_headers()
+
+ if "params" in kwargs:
+ params = kwargs["params"]
+ for key, value in params.items():
+ if isinstance(value, bool):
+ params[key] = str(value).lower()
+
+ response = await self.session.request(
+ method, endpoint, headers=headers, **kwargs
+ )
+ try:
+ response.raise_for_status()
+ return response
+ except (aiohttp.ClientResponseError, aiohttp.ClientError) as e:
+ await response.close()
+ if isinstance(e, aiohttp.ClientResponseError):
+ raise api_exception(code=e.status, error=e)
+ else:
+ raise general_exception(error=e)
diff --git a/djelia/src/services/__init__.py b/djelia/src/services/__init__.py
new file mode 100644
index 0000000..e37ffd1
--- /dev/null
+++ b/djelia/src/services/__init__.py
@@ -0,0 +1,12 @@
+from .transcription import AsyncTranscription, Transcription
+from .translation import AsyncTranslation, Translation
+from .tts import TTS, AsyncTTS
+
+__all__ = [
+ "Transcription",
+ "AsyncTranscription",
+ "Translation",
+ "AsyncTranslation",
+ "TTS",
+ "AsyncTTS",
+]
diff --git a/djelia/src/services/transcription.py b/djelia/src/services/transcription.py
new file mode 100644
index 0000000..68d9272
--- /dev/null
+++ b/djelia/src/services/transcription.py
@@ -0,0 +1,239 @@
+import json
+import os
+from typing import AsyncGenerator, BinaryIO, Generator, List, Optional, Union
+
+import aiohttp
+
+from djelia.models import (DjeliaRequest, ErrorsMessage,
+ FrenchTranscriptionResponse, Params,
+ TranscriptionSegment, Versions)
+
+
+class Transcription:
+ def __init__(self, client):
+ self.client = client
+
+ def transcribe(
+ self,
+ audio_file: Union[str, BinaryIO],
+ translate_to_french: Optional[bool] = False,
+ stream: Optional[bool] = False,
+ version: Optional[Versions] = Versions.v2,
+ ) -> Union[List[TranscriptionSegment], FrenchTranscriptionResponse, Generator]:
+ if not stream:
+ try:
+ params = {Params.translate_to_french: str(translate_to_french).lower()}
+ if isinstance(audio_file, str):
+ with open(audio_file, "rb") as f:
+ files = {Params.file: f}
+ response = self.client._make_request(
+ method=DjeliaRequest.transcribe.method,
+ endpoint=DjeliaRequest.transcribe.endpoint.format(
+ version.value
+ ),
+ files=files,
+ params=params,
+ )
+ else:
+ files = {Params.file: audio_file}
+ response = self.client._make_request(
+ method=DjeliaRequest.transcribe.method,
+ endpoint=DjeliaRequest.transcribe.endpoint.format(
+ version.value
+ ),
+ files=files,
+ params=params,
+ )
+
+ except IOError as e:
+ raise IOError(ErrorsMessage.ioerror_read.format(str(e)))
+
+ data = response.json()
+ return (
+ FrenchTranscriptionResponse(**data)
+ if translate_to_french
+ else [TranscriptionSegment(**segment) for segment in data]
+ )
+
+ else:
+ return self._stream_transcribe(audio_file, translate_to_french, version)
+
+ def _stream_transcribe(
+ self,
+ audio_file: Union[str, BinaryIO],
+ translate_to_french: bool = False,
+ version: Optional[Versions] = Versions.v2,
+ ) -> Generator[
+ Union[TranscriptionSegment, FrenchTranscriptionResponse], None, None
+ ]:
+ try:
+ params = {Params.translate_to_french: str(translate_to_french).lower()}
+ if isinstance(audio_file, str):
+ with open(audio_file, "rb") as f:
+ files = {Params.file: f}
+ response = self.client._make_request(
+ method=DjeliaRequest.transcribe.method,
+ endpoint=DjeliaRequest.transcribe.endpoint.format(
+ version.value
+ ),
+ files=files,
+ params=params,
+ )
+ else:
+ files = {Params.file: audio_file}
+ response = self.client._make_request(
+ method=DjeliaRequest.transcribe_stream.method,
+ endpoint=DjeliaRequest.transcribe.endpoint.format(version.value),
+ files=files,
+ params=params,
+ )
+
+ except IOError as e:
+ raise IOError(ErrorsMessage.ioerror_read.format(str(e)))
+
+ for line in response.iter_lines():
+ if line:
+ try:
+ data = json.loads(line.decode("utf-8"))
+ if isinstance(data, list):
+ for segment in data:
+ yield (
+ FrenchTranscriptionResponse(**segment)
+ if translate_to_french
+ else TranscriptionSegment(**segment)
+ )
+ else:
+ yield (
+ FrenchTranscriptionResponse(**data)
+ if translate_to_french
+ else TranscriptionSegment(**data)
+ )
+ except json.JSONDecodeError:
+ continue
+
+
+class AsyncTranscription:
+ def __init__(self, client):
+ self.client = client
+
+ async def transcribe(
+ self,
+ audio_file: Union[str, BinaryIO],
+ translate_to_french: Optional[bool] = False,
+ stream: Optional[bool] = False,
+ version: Optional[Versions] = Versions.v2,
+ ) -> Union[List[TranscriptionSegment], FrenchTranscriptionResponse, AsyncGenerator]:
+ if not stream:
+ try:
+ data = aiohttp.FormData()
+ if isinstance(audio_file, str):
+ with open(audio_file, "rb") as f:
+ data.add_field(
+ Params.file, f.read(), filename=os.path.basename(audio_file)
+ )
+ else:
+ data.add_field(
+ Params.file, audio_file.read(), filename=Params.filename
+ )
+
+ params = {Params.translate_to_french: str(translate_to_french).lower()}
+ response_data = await self.client._make_request(
+ method=DjeliaRequest.transcribe.method,
+ endpoint=DjeliaRequest.transcribe.endpoint.format(version.value),
+ data=data,
+ params=params,
+ )
+
+ except IOError as e:
+ raise IOError(ErrorsMessage.ioerror_read.format(str(e)))
+
+ return (
+ FrenchTranscriptionResponse(**response_data)
+ if translate_to_french
+ else [TranscriptionSegment(**segment) for segment in response_data]
+ )
+
+ else:
+ return self._stream_transcribe(audio_file, translate_to_french, version)
+
+ async def _stream_transcribe(
+ self,
+ audio_file: Union[str, BinaryIO],
+ translate_to_french: bool = False,
+ version: Optional[Versions] = Versions.v2,
+ ) -> AsyncGenerator[Union[TranscriptionSegment, FrenchTranscriptionResponse], None]:
+ try:
+ data = aiohttp.FormData()
+ if isinstance(audio_file, str):
+ with open(audio_file, "rb") as f:
+ data.add_field(
+ Params.file, f.read(), filename=os.path.basename(audio_file)
+ )
+ else:
+ data.add_field(Params.file, audio_file.read(), filename=Params.filename)
+
+ params = {Params.translate_to_french: str(translate_to_french).lower()}
+ response = await self.client._make_streaming_request(
+ method=DjeliaRequest.transcribe_stream.method,
+ endpoint=DjeliaRequest.transcribe_stream.endpoint.format(version.value),
+ data=data,
+ params=params,
+ )
+ except IOError as e:
+ raise IOError(ErrorsMessage.ioerror_read.format(str(e)))
+
+ try:
+ if hasattr(response, "content") and response.content:
+ async for line in response.content:
+ if line:
+ try:
+ line_str = line.decode("utf-8").strip()
+ if line_str:
+ segment_data = json.loads(line_str)
+
+ if isinstance(segment_data, list):
+ for segment in segment_data:
+ yield (
+ FrenchTranscriptionResponse(**segment)
+ if translate_to_french
+ else TranscriptionSegment(**segment)
+ )
+ else:
+ yield (
+ FrenchTranscriptionResponse(**segment_data)
+ if translate_to_french
+ else TranscriptionSegment(**segment_data)
+ )
+ except Exception: # I will comeback here for better handling
+ continue
+ else:
+ try:
+ response_text = await response.text()
+ if response_text.strip():
+ segment_data = json.loads(response_text)
+
+ if isinstance(segment_data, list):
+ for segment in segment_data:
+ yield (
+ FrenchTranscriptionResponse(**segment)
+ if translate_to_french
+ else TranscriptionSegment(**segment)
+ )
+ else:
+ yield (
+ FrenchTranscriptionResponse(**segment_data)
+ if translate_to_french
+ else TranscriptionSegment(**segment_data)
+ )
+ except Exception:
+ pass
+
+ except Exception as e:
+ raise e
+ finally:
+ try:
+ if hasattr(response, "close"):
+ await response.close()
+ except Exception:
+ # there is a know issue here due to the server response
+ pass
diff --git a/djelia/src/services/translation.py b/djelia/src/services/translation.py
new file mode 100644
index 0000000..4bc7e46
--- /dev/null
+++ b/djelia/src/services/translation.py
@@ -0,0 +1,56 @@
+from typing import List, Optional
+
+from djelia.models import (DjeliaRequest, SupportedLanguageSchema,
+ TranslationRequest, TranslationResponse, Versions)
+
+
+class Translation:
+ def __init__(self, client):
+ self.client = client
+
+ def get_supported_languages(self) -> List[SupportedLanguageSchema]:
+ response = self.client._make_request(
+ method=DjeliaRequest.get_supported_languages.method,
+ endpoint=DjeliaRequest.get_supported_languages.endpoint.format(
+ Versions.v1.value
+ ),
+ )
+ return [SupportedLanguageSchema(**lang) for lang in response.json()]
+
+ def translate(
+ self,
+ request: TranslationRequest,
+ version: Optional[Versions] = Versions.v1.value,
+ ) -> TranslationResponse:
+ data = request.dict()
+ response = self.client._make_request(
+ method=DjeliaRequest.translate.method,
+ endpoint=DjeliaRequest.translate.endpoint.format(version.value),
+ json=data,
+ )
+ return TranslationResponse(**response.json())
+
+
+class AsyncTranslation:
+ def __init__(self, client):
+ self.client = client
+
+ async def get_supported_languages(self) -> List[SupportedLanguageSchema]:
+ data = await self.client._make_request(
+ method=DjeliaRequest.get_supported_languages.method,
+ endpoint=DjeliaRequest.get_supported_languages.endpoint.format(
+ Versions.v1.value
+ ),
+ )
+ return [SupportedLanguageSchema(**lang) for lang in data]
+
+ async def translate(
+ self, request: TranslationRequest, version: Optional[Versions] = Versions.v1
+ ) -> TranslationResponse:
+ request_data = request.dict()
+ data = await self.client._make_request(
+ method=DjeliaRequest.translate.method,
+ endpoint=DjeliaRequest.translate.endpoint.format(version.value),
+ json=request_data,
+ )
+ return TranslationResponse(**data)
diff --git a/djelia/src/services/tts.py b/djelia/src/services/tts.py
new file mode 100644
index 0000000..f44e57d
--- /dev/null
+++ b/djelia/src/services/tts.py
@@ -0,0 +1,332 @@
+from typing import AsyncGenerator, Generator, Optional, Union
+
+from djelia.config.settings import VALID_SPEAKER_IDS, VALID_TTS_V2_SPEAKERS
+from djelia.models import (DjeliaRequest, ErrorsMessage, TTSRequest,
+ TTSRequestV2, Versions)
+from djelia.utils.exceptions import SpeakerError
+
+
+class TTS:
+ def __init__(self, client):
+ self.client = client
+
+ def text_to_speech(
+ self,
+ request: Union[TTSRequest, TTSRequestV2],
+ output_file: Optional[str] = None,
+ stream: Optional[bool] = False,
+ version: Optional[Versions] = Versions.v1,
+ ) -> Union[bytes, str, Generator]:
+ if version == Versions.v1:
+ if not isinstance(request, TTSRequest):
+ raise ValueError(ErrorsMessage.tts_v1_request_error)
+ if request.speaker not in VALID_SPEAKER_IDS:
+ raise SpeakerError(
+ ErrorsMessage.speaker_id_error.format(
+ VALID_SPEAKER_IDS, request.speaker
+ )
+ )
+ else:
+ if not isinstance(request, TTSRequestV2):
+ raise ValueError(ErrorsMessage.tts_v2_request_error)
+ speaker_found = any(
+ speaker.lower() in request.description.lower()
+ for speaker in VALID_TTS_V2_SPEAKERS
+ )
+ if not speaker_found:
+ raise SpeakerError(
+ ErrorsMessage.speaker_description_error.format(
+ VALID_TTS_V2_SPEAKERS
+ )
+ )
+
+ if not stream:
+ data = request.dict()
+ response = self.client._make_request(
+ method=DjeliaRequest.tts.method,
+ endpoint=DjeliaRequest.tts.endpoint.format(version.value),
+ json=data,
+ )
+
+ if output_file:
+ try:
+ with open(output_file, "wb") as f:
+ f.write(response.content)
+ return output_file
+ except IOError as e:
+ raise IOError(ErrorsMessage.ioerror_save.format(str(e)))
+ else:
+ return response.content
+ else:
+ if version == Versions.v1:
+ raise ValueError(ErrorsMessage.tts_streaming_conpatibility)
+ return self._stream_text_to_speech(request, output_file, version)
+
+ def _stream_text_to_speech(
+ self,
+ request: TTSRequestV2,
+ output_file: Optional[str] = None,
+ version: Optional[Versions] = Versions.v2,
+ ) -> Generator[bytes, None, None]:
+ data = request.dict()
+ response = self.client._make_request(
+ method=DjeliaRequest.tts_stream.method,
+ endpoint=DjeliaRequest.tts_stream.endpoint.format(version.value),
+ json=data,
+ stream=True,
+ )
+
+ audio_chunks = []
+ for chunk in response.iter_content(chunk_size=8192):
+ if chunk:
+ audio_chunks.append(chunk)
+ yield chunk
+
+ if output_file:
+ try:
+ with open(output_file, "wb") as f:
+ for chunk in audio_chunks:
+ f.write(chunk)
+ except IOError as e:
+ raise IOError(ErrorsMessage.ioerror_save.format(str(e)))
+
+
+class AsyncTTS:
+ def __init__(self, client):
+ self.client = client
+
+ async def text_to_speech(
+ self,
+ request: Union[TTSRequest, TTSRequestV2],
+ output_file: Optional[str] = None,
+ stream: Optional[bool] = False,
+ version: Optional[Versions] = Versions.v1,
+ ) -> Union[bytes, str, AsyncGenerator]:
+ if version == Versions.v1:
+ if not isinstance(request, TTSRequest):
+ raise ValueError(ErrorsMessage.tts_v1_request_error)
+ if request.speaker not in VALID_SPEAKER_IDS:
+ raise SpeakerError(
+ ErrorsMessage.speaker_id_error.format(
+ VALID_SPEAKER_IDS, request.speaker
+ )
+ )
+ else:
+ if not isinstance(request, TTSRequestV2):
+ raise ValueError(ErrorsMessage.tts_v2_request_error)
+ speaker_found = any(
+ speaker.lower() in request.description.lower()
+ for speaker in VALID_TTS_V2_SPEAKERS
+ )
+ if not speaker_found:
+ raise SpeakerError(
+ ErrorsMessage.speaker_description_error.format(
+ VALID_TTS_V2_SPEAKERS
+ )
+ )
+
+ if not stream:
+ request_data = request.dict()
+ content = await self.client._make_request(
+ method=DjeliaRequest.tts.method,
+ endpoint=DjeliaRequest.tts.endpoint.format(version.value),
+ json=request_data,
+ )
+
+ if output_file:
+ try:
+ with open(output_file, "wb") as f:
+ f.write(content)
+ return output_file
+ except IOError as e:
+ raise IOError(ErrorsMessage.ioerror_save.format(str(e)))
+ else:
+ return content
+ else:
+ if version == Versions.v1:
+ raise ValueError(ErrorsMessage.tts_streaming_conpatibility)
+ # FIXED: Remove 'await' here - async generators should not be awaited when returned
+ return self._stream_text_to_speech(request, output_file, version)
+
+ async def _stream_text_to_speech(
+ self,
+ request: TTSRequestV2,
+ output_file: Optional[str] = None,
+ version: Optional[Versions] = Versions.v2,
+ ) -> AsyncGenerator[bytes, None]:
+ request_data = request.dict()
+ response = await self.client._make_streaming_request(
+ method=DjeliaRequest.tts_stream.method,
+ endpoint=DjeliaRequest.tts_stream.endpoint.format(version.value),
+ json=request_data,
+ )
+
+ audio_chunks = []
+ try:
+ async for chunk in response.content.iter_chunked(8192):
+ if chunk:
+ audio_chunks.append(chunk)
+ yield chunk
+ finally:
+ await response.close()
+
+ if output_file:
+ try:
+ with open(output_file, "wb") as f:
+ for chunk in audio_chunks:
+ f.write(chunk)
+ except IOError as e:
+ raise IOError(ErrorsMessage.ioerror_save.format(str(e)))
+
+
+# from typing import Union, Optional, Generator, AsyncGenerator
+# from djelia.models import (DjeliaRequest, TTSRequest, TTSRequestV2, Versions, ErrorsMessage)
+# from djelia.config.settings import VALID_SPEAKER_IDS, VALID_TTS_V2_SPEAKERS
+# from djelia.utils.exceptions import SpeakerError
+
+# class TTS:
+# def __init__(self, client):
+# self.client = client
+
+# def text_to_speech(self, request: Union[TTSRequest, TTSRequestV2],
+# output_file: Optional[str] = None,
+# stream: Optional[bool] = False,
+# version: Optional[Versions] = Versions.v1) -> Union[bytes, str, Generator]:
+
+# if version == Versions.v1:
+# if not isinstance(request, TTSRequest):
+# raise ValueError(ErrorsMessage.tts_v1_request_error)
+# if request.speaker not in VALID_SPEAKER_IDS:
+# raise SpeakerError(ErrorsMessage.speaker_id_error.format(VALID_SPEAKER_IDS, request.speaker))
+# else:
+# if not isinstance(request, TTSRequestV2):
+# raise ValueError(ErrorsMessage.tts_v2_request_error)
+# speaker_found = any(speaker.lower() in request.description.lower() for speaker in VALID_TTS_V2_SPEAKERS)
+# if not speaker_found:
+# raise SpeakerError(ErrorsMessage.speaker_description_error.format(VALID_TTS_V2_SPEAKERS))
+
+# if not stream:
+# data = request.dict()
+# response = self.client._make_request(
+# method=DjeliaRequest.tts.method,
+# endpoint=DjeliaRequest.tts.endpoint.format(version.value),
+# json=data
+# )
+
+# if output_file:
+# try:
+# with open(output_file, 'wb') as f:
+# f.write(response.content)
+# return output_file
+# except IOError as e:
+# raise IOError(ErrorsMessage.ioerror_save.format(str(e)))
+# else:
+# return response.content
+# else:
+# if version == Versions.v1:
+# raise ValueError(ErrorsMessage.tts_streaming_conpatibility)
+# return self._stream_text_to_speech(request, output_file, version)
+
+# def _stream_text_to_speech(self, request: TTSRequestV2,
+# output_file: Optional[str] = None,
+# version: Optional[Versions] = Versions.v2) -> Generator[bytes, None, None]:
+# data = request.dict()
+# response = self.client._make_request(
+# method=DjeliaRequest.tts_stream.method,
+# endpoint=DjeliaRequest.tts_stream.endpoint.format(version.value),
+# json=data,
+# stream=True
+# )
+
+# audio_chunks = []
+# for chunk in response.iter_content(chunk_size=8192):
+# if chunk:
+# audio_chunks.append(chunk)
+# yield chunk
+
+# if output_file:
+# try:
+# with open(output_file, 'wb') as f:
+# for chunk in audio_chunks:
+# f.write(chunk)
+# except IOError as e:
+# raise IOError(ErrorsMessage.ioerror_save.format(str(e)))
+
+
+# class AsyncTTS:
+# def __init__(self, client):
+# self.client = client
+
+# async def text_to_speech(self, request: Union[TTSRequest, TTSRequestV2],
+# output_file: Optional[str] = None,
+# stream: Optional[bool] = False,
+# version: Optional[Versions] = Versions.v1) -> Union[bytes, str, AsyncGenerator]:
+
+# if version == Versions.v1:
+# if not isinstance(request, TTSRequest):
+# raise ValueError(ErrorsMessage.tts_v1_request_error)
+# if request.speaker not in VALID_SPEAKER_IDS:
+# raise SpeakerError(ErrorsMessage.speaker_id_error.format(VALID_SPEAKER_IDS, request.speaker))
+# else: # This implies version is v2 or other future versions supporting TTSRequestV2
+# if not isinstance(request, TTSRequestV2):
+# raise ValueError(ErrorsMessage.tts_v2_request_error)
+# speaker_found = any(speaker.lower() in request.description.lower() for speaker in VALID_TTS_V2_SPEAKERS)
+# if not speaker_found:
+# raise SpeakerError(ErrorsMessage.speaker_description_error.format(VALID_TTS_V2_SPEAKERS))
+
+# if not stream:
+# request_data = request.dict()
+# content = await self.client._make_request(
+# method=DjeliaRequest.tts.method,
+# endpoint=DjeliaRequest.tts.endpoint.format(version.value),
+# json=request_data
+# )
+
+# if output_file:
+# try:
+# with open(output_file, 'wb') as f:
+# f.write(content)
+# return output_file
+# except IOError as e:
+# raise IOError(ErrorsMessage.ioerror_save.format(str(e)))
+# else:
+# return content
+# else: # stream is True
+# if version == Versions.v1:
+# raise ValueError(ErrorsMessage.tts_streaming_conpatibility)
+# # MODIFIED LINE: Removed 'await'
+# return self._stream_text_to_speech(request, output_file, version)
+
+
+# async def _stream_text_to_speech(self, request: TTSRequestV2,
+# output_file: Optional[str] = None,
+# version: Optional[Versions] = Versions.v2) -> AsyncGenerator[bytes, None]:
+# request_data = request.dict()
+# response = await self.client._make_streaming_request(
+# method=DjeliaRequest.tts_stream.method,
+# endpoint=DjeliaRequest.tts_stream.endpoint.format(version.value),
+# json=request_data
+# )
+
+# audio_chunks = []
+# try:
+# async for chunk in response.content.iter_chunked(8192):
+# if chunk:
+# audio_chunks.append(chunk)
+# yield chunk
+# finally:
+# await response.close() # Ensure the response is closed
+
+# if output_file:
+# try:
+# with open(output_file, 'wb') as f:
+# for chunk in audio_chunks:
+# f.write(chunk)
+# except IOError as e:
+# raise IOError(ErrorsMessage.ioerror_save.format(str(e)))
+
+
+# from typing import Union, Optional, Generator, AsyncGenerator
+# from djelia.models import (DjeliaRequest, TTSRequest, TTSRequestV2, Versions, ErrorsMessage)
+# from djelia.config.settings import VALID_SPEAKER_IDS, VALID_TTS_V2_SPEAKERS
+# from djelia.utils.exceptions import SpeakerError
diff --git a/djelia/utils/errors.py b/djelia/utils/errors.py
new file mode 100644
index 0000000..c0cc2b7
--- /dev/null
+++ b/djelia/utils/errors.py
@@ -0,0 +1,35 @@
+from typing import Any, Dict
+
+from djelia.utils.exceptions import (APIError, AuthenticationError,
+ DjeliaError, ValidationError)
+
+
+class ExceptionMessage:
+ messages: Dict[int, str] = {
+ 401: "Invalid or expired API key",
+ 403: "Forbidden: You do not have permission to access this resource",
+ 404: "Resource not found",
+ 422: "Validation error",
+ }
+ default: str = "API error {}"
+ failed: str = "Request failed: {}"
+
+
+class CodeStatusExceptions:
+ exceptions: Dict[int, Any] = {
+ 401: AuthenticationError,
+ 403: APIError,
+ 404: APIError,
+ 422: ValidationError,
+ }
+ default = DjeliaError
+
+
+def api_exception(code: int, error: Exception) -> Exception:
+ return CodeStatusExceptions.exceptions.get(code, APIError)(
+ ExceptionMessage.messages.get(code, ExceptionMessage.default.format(str(error)))
+ )
+
+
+def general_exception(error: Exception) -> Exception:
+ return CodeStatusExceptions.default(ExceptionMessage.failed.format(str(error)))
diff --git a/djelia/exceptions.py b/djelia/utils/exceptions.py
similarity index 81%
rename from djelia/exceptions.py
rename to djelia/utils/exceptions.py
index ff61bda..41f2f1e 100644
--- a/djelia/exceptions.py
+++ b/djelia/utils/exceptions.py
@@ -1,31 +1,37 @@
-# I will come back to this part but for now let's keep it like this
class DjeliaError(Exception):
"""Base exception for all Djelia client errors"""
+
pass
+
class AuthenticationError(DjeliaError):
"""Exception raised for authentication errors"""
+
pass
+
class ValidationError(DjeliaError):
"""Exception raised for validation errors"""
+
pass
+
class APIError(DjeliaError):
"""Exception raised for API errors"""
+
def __init__(self, status_code, message, *args):
self.status_code = status_code
self.message = message
super().__init__(f"API Error ({status_code}): {message}", *args)
+
class LanguageError(ValidationError):
"""Exception raised for unsupported languages"""
+
pass
+
class SpeakerError(ValidationError):
"""Exception raised for invalid speaker IDs"""
- pass
-class AudioFormatError(ValidationError):
- """Exception raised for unsupported audio formats"""
- pass
\ No newline at end of file
+ pass
diff --git a/djelia/utils/logger.py b/djelia/utils/logger.py
new file mode 100644
index 0000000..e69de29
diff --git a/fix-code.sh b/fix-code.sh
new file mode 100755
index 0000000..3d7721e
--- /dev/null
+++ b/fix-code.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+set -e
+
+echo "🔧 Running code style fix..."
+
+ruff check djelia cookbook test setup.py --fix
+ruff check djelia cookbook test setup.py
+isort check djelia cookbook test setup.py
+black .
+
+git add check djelia cookbook test setup.py
+git commit -m "code style ✅ : auto format (ruff, isort, black)" || echo "✅ Nothing to commit"
+
+echo "✅ Done!"
diff --git a/setup.py b/setup.py
index a206f0b..587814d 100644
--- a/setup.py
+++ b/setup.py
@@ -1,41 +1,90 @@
-from setuptools import setup, find_packages
+import os
-with open("README.md", "r", encoding="utf-8") as fh:
- long_description = fh.read()
+from setuptools import find_packages, setup
+
+this_directory = os.path.abspath(os.path.dirname(__file__))
+with open(os.path.join(this_directory, "README.md"), encoding="utf-8") as f:
+ long_description = f.read()
setup(
name="djelia",
- version="0.2.0",
+ version="1.0.1",
author="Djelia",
author_email="support@djelia.cloud",
- description="Python client for Djelia API, providing access to linguistic models for Bambara languages",
+ description="Djelia Python SDK - Advanced AI for African Languages",
long_description=long_description,
long_description_content_type="text/markdown",
- url="https://github.com/djelia-org/djelia-python-client",
-
+ url="https://github.com/djelia-org/djelia-python-sdk",
+ project_urls={
+ "Bug Tracker": "https://github.com/djelia-org/djelia-python-sdk/issues",
+ "Documentation": "https://djelia.cloud/docs",
+ "Source Code": "https://github.com/djelia-org/djelia-python-sdk",
+ "Homepage": "https://djelia.cloud",
+ },
packages=find_packages(),
classifiers=[
+ "Development Status :: 4 - Beta",
+ "Intended Audience :: Developers",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ "Topic :: Text Processing :: Linguistic",
+ "Topic :: Scientific/Engineering :: Artificial Intelligence",
+ "Topic :: Multimedia :: Sound/Audio :: Speech",
+ "License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
- "License :: OSI Approved :: MIT License",
+ "Programming Language :: Python :: 3.12",
"Operating System :: OS Independent",
- "Development Status :: 4 - Beta",
- "Intended Audience :: Developers",
- "Topic :: Software Development :: Libraries :: Python Modules",
- "Topic :: Text Processing :: Linguistic",
],
python_requires=">=3.7",
install_requires=[
"requests>=2.25.0",
- "aiohttp>=3.7.0",
+ "aiohttp>=3.8.0",
+ "pydantic>=1.8.0,<2.0.0",
+ "tenacity>=9.1.2",
],
- keywords="djelia, nlp, translation, transcription, text-to-speech, african languages, bambara, mali, async",
- project_urls={
- "Documentation": "https://djelia.cloud/docs",
- "Bug Tracker": "https://github.com/djelia-org/djelia-python-client/issues",
+ extras_require={
+ "dev": [
+ "pytest>=6.0",
+ "pytest-asyncio>=0.18.0",
+ "black>=21.0.0",
+ "flake8>=3.9.0",
+ "mypy>=0.910",
+ "isort>=5.0.0",
+ "ruff>=0.11.4",
+ "pre-commit>=2.15.0",
+ ],
+ "docs": [
+ "sphinx>=4.0.0",
+ "sphinx-rtd-theme>=0.5.0",
+ ],
+ "test": [
+ "pytest>=6.0",
+ "pytest-asyncio>=0.18.0",
+ "python-dotenv>=0.19.0",
+ ],
},
-)
\ No newline at end of file
+ keywords=[
+ "djelia",
+ "nlp",
+ "translation",
+ "transcription",
+ "text-to-speech",
+ "tts",
+ "african languages",
+ "bambara",
+ "mali",
+ "async",
+ "ai",
+ "machine learning",
+ "speech recognition",
+ "voice synthesis",
+ "multilingual",
+ "linguistics",
+ ],
+ include_package_data=True,
+ zip_safe=False,
+)
diff --git a/test/test.py b/test/test.py
deleted file mode 100644
index db71bfe..0000000
--- a/test/test.py
+++ /dev/null
@@ -1,52 +0,0 @@
-from djelia import Djelia
-
-
-client = Djelia(api_key="API key here")
-
-languages = client.get_supported_languages()
-print("testing supported languages endpoint........")
-print("Supported languages:", languages)
-
-
-print("testing translatino endpoint........")
-translation = client.translate(
- text="Hello, how are you doing today?",
- source="en",
- target="bam"
-)
-print("Translation:", translation["text"])
-
-
-print("testing transcription v1 endpoint........")
-result_v1 = client.transcribe("test1.wav")
-for segment in result_v1:
- print(f"V1 Transcription: {segment['text']} ({segment['start']} - {segment['end']})")
-
-print("testing transcription v2 endpoint........")
-result_v2 = client.transcribe("test1.wav", version=2)
-for segment in result_v2:
- print(f"V2 Transcription: {segment['text']} ({segment['start']} - {segment['end']})")
-
-
-print("testing straming with transcription v2 endpoint........")
-# Streaming transcription with V2 API
-for segment in client.stream_transcribe("test1.wav", version=2):
- print(f"V2 Streaming: {segment['text']} ({segment.get('start', 'N/A')} - {segment.get('end', 'N/A')})")
-
-
-print("testing TTS endpoint........")
-# Text-to-speech with specific speaker voice
-audio_path = client.text_to_speech(
- text="Aw ni ce",
- speaker=1,
- output_file="greeting.wav"
-)
-print(f"Audio saved to: {audio_path}")
-
-print("testing speach translation with v2 endpoint........")
-# Transcribe with translation to French (works with both V1 and V2)
-french_result = client.transcribe("test1.wav", translate_to_french=True, version=2)
-print(f"French translation: {french_result['text']}")
-
-
-