-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Enabled shared speech profiles for realtime multi-speaker identification #3989
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Enabled shared speech profiles for realtime multi-speaker identification #3989
Conversation
…Implemented share/revoke of speech profiles and wired shared embeddings, with staging smoke tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant new feature for sharing speech profiles and integrating them into the real-time speaker identification pipeline. The implementation is well-structured, with new database helpers, API endpoints, and updates to the websocket handler. My review focuses on improving robustness and efficiency. I've identified several areas where error handling can be improved to prevent silent failures, particularly around database access and Redis communication. I've also suggested refactoring to address code duplication and a performance optimization for the real-time update mechanism to make it more scalable. All original comments have been retained as they align with best practices and are not contradicted by any provided rules.
| try: | ||
| from database.users import get_people | ||
|
|
||
| all_people = get_people(uid) | ||
| if all_people: | ||
| person_doc = all_people[0] | ||
| except Exception: | ||
| person_doc = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try...except Exception block is too broad and silently ignores any errors that might occur when fetching the user's people from the database. If an error occurs (e.g., a database connection issue), person_doc will be None, and the profile will be shared with an empty embedding without any indication of failure. This can lead to unexpected behavior and makes debugging difficult. The exception should be logged.
except Exception as e:
print(f"Failed to get people for user {uid} when sharing profile: {e}")
person_doc = None
backend/routers/speech_profile.py
Outdated
| try: | ||
| channel = f'users:{target_uid}:shared_profiles' | ||
| payload = {'action': 'add', 'source_uid': uid, 'name': person_name} | ||
| redis_db.r.publish(channel, json.dumps(payload)) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try...except Exception: pass block silently ignores failures when publishing to Redis. If Redis is unavailable, the notification for the shared profile will not be sent, and active listening sessions will not be updated in real-time. This undermines a key aspect of the feature. The exception should be logged to aid in debugging.
try:
channel = f'users:{target_uid}:shared_profiles'
payload = {'action': 'add', 'source_uid': uid, 'name': person_name}
redis_db.r.publish(channel, json.dumps(payload))
except Exception as e:
print(f"Failed to publish 'add' shared profile notification to Redis for user {target_uid}: {e}")
backend/routers/speech_profile.py
Outdated
| try: | ||
| channel = f'users:{target_uid}:shared_profiles' | ||
| payload = {'action': 'remove', 'source_uid': uid} | ||
| redis_db.r.publish(channel, json.dumps(payload)) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the share endpoint, the try...except Exception: pass block here silently ignores failures when publishing the 'revoke' notification to Redis. This can lead to stale data in active sessions if Redis is down. The exception should be logged.
try:
channel = f'users:{target_uid}:shared_profiles'
payload = {'action': 'remove', 'source_uid': uid}
redis_db.r.publish(channel, json.dumps(payload))
except Exception as e:
print(f"Failed to publish 'remove' shared profile notification to Redis for user {target_uid}: {e}")
backend/routers/transcribe.py
Outdated
| # Also load shared people (profiles other users shared with this user) | ||
| try: | ||
| shared = user_db.get_shared_people(uid) | ||
| for s in shared: | ||
| sid = s.get('id') | ||
| emb = s.get('speaker_embedding') | ||
| name = s.get('name') or f"Shared-{sid}" | ||
| if emb: | ||
| # Use a namespaced id to avoid collision with local people ids | ||
| ns_id = f"shared_{sid}" | ||
| person_embeddings_cache[ns_id] = { | ||
| 'embedding': np.array(emb, dtype=np.float32).reshape(1, -1), | ||
| 'name': name, | ||
| } | ||
| except Exception as e: | ||
| print(f"Speaker ID: failed loading shared people: {e}", uid, session_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for loading shared people and populating the person_embeddings_cache is duplicated here and inside the _shared_profiles_listener (lines 1391-1402). This duplication makes the code harder to maintain, as changes might be applied to one location but missed in the other, potentially introducing bugs.
To improve maintainability, this logic should be extracted into a helper function. For example:
def _load_shared_people_into_cache(uid: str, cache: dict):
try:
shared = user_db.get_shared_people(uid)
for s in shared:
sid = s.get('id')
emb = s.get('speaker_embedding')
name = s.get('name') or f"Shared-{sid}"
if emb:
ns_id = f"shared_{sid}"
cache[ns_id] = {
'embedding': np.array(emb, dtype=np.float32).reshape(1, -1),
'name': name,
}
except Exception as e:
print(f"Speaker ID: failed loading shared people: {e}", uid, session_id)
# ... then call it in both places:
_load_shared_people_into_cache(uid, person_embeddings_cache)| try: | ||
| payload = json.loads(msg) | ||
| action = payload.get('action') | ||
| source_uid = payload.get('source_uid') | ||
| if action == 'add': | ||
| # reload shared people | ||
| shared = user_db.get_shared_people(uid) | ||
| for s in shared: | ||
| sid = s.get('id') | ||
| emb = s.get('speaker_embedding') | ||
| name = s.get('name') or f"Shared-{sid}" | ||
| if emb: | ||
| ns_id = f"shared_{sid}" | ||
| person_embeddings_cache[ns_id] = { | ||
| 'embedding': np.array(emb, dtype=np.float32).reshape(1, -1), | ||
| 'name': name, | ||
| } | ||
| elif action == 'remove': | ||
| ns_id = f"shared_{source_uid}" | ||
| if ns_id in person_embeddings_cache: | ||
| del person_embeddings_cache[ns_id] | ||
| except Exception: | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try...except Exception: continue block in the Redis listener will silently ignore any errors during message processing. This includes JSON decoding errors, database errors when reloading profiles, or any other unexpected issues. If an error occurs, the person_embeddings_cache will not be updated, leading to stale data and incorrect speaker identification, without any log of what went wrong. The exception should be logged, along with the message payload, for easier debugging.
try:
payload = json.loads(msg)
action = payload.get('action')
source_uid = payload.get('source_uid')
if action == 'add':
# reload shared people
shared = user_db.get_shared_people(uid)
for s in shared:
sid = s.get('id')
emb = s.get('speaker_embedding')
name = s.get('name') or f"Shared-{sid}"
if emb:
ns_id = f"shared_{sid}"
person_embeddings_cache[ns_id] = {
'embedding': np.array(emb, dtype=np.float32).reshape(1, -1),
'name': name,
}
elif action == 'remove':
ns_id = f"shared_{source_uid}"
if ns_id in person_embeddings_cache:
del person_embeddings_cache[ns_id]
except Exception as e:
print(f"Error processing shared profile message for user {uid}: {e}. Payload: {msg}")
continue| if action == 'add': | ||
| # reload shared people | ||
| shared = user_db.get_shared_people(uid) | ||
| for s in shared: | ||
| sid = s.get('id') | ||
| emb = s.get('speaker_embedding') | ||
| name = s.get('name') or f"Shared-{sid}" | ||
| if emb: | ||
| ns_id = f"shared_{sid}" | ||
| person_embeddings_cache[ns_id] = { | ||
| 'embedding': np.array(emb, dtype=np.float32).reshape(1, -1), | ||
| 'name': name, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation for handling the add action in the Redis listener is inefficient. It re-fetches and re-processes all shared profiles from the database every time a single new profile is shared. This can lead to unnecessary database load and processing, especially if a user has many shared profiles.
A more efficient approach would be to include the new profile's data (including the embedding) directly in the Redis message payload. This would eliminate the need for a database query in the listener.
1. In backend/routers/speech_profile.py:
Modify the share_speech_profile_with_user function to include the embedding in the payload:
# ...
person_name = name or (person_doc.get('name') if person_doc else 'Unknown')
add_shared_person_to_user(target_uid, uid, person_name, embedding or [], profile_url)
# Notify target user's active sessions via Redis pubsub
try:
channel = f'users:{target_uid}:shared_profiles'
payload = {
'action': 'add',
'source_uid': uid,
'name': person_name,
'speaker_embedding': embedding or []
}
redis_db.r.publish(channel, json.dumps(payload))
# ...2. In backend/routers/transcribe.py:
Update the _shared_profiles_listener to use the data from the payload directly:
# ...
if action == 'add':
source_uid = payload.get('source_uid')
emb = payload.get('speaker_embedding')
name = payload.get('name') or f"Shared-{source_uid}"
if emb and source_uid:
ns_id = f"shared_{source_uid}"
person_embeddings_cache[ns_id] = {
'embedding': np.array(emb, dtype=np.float32).reshape(1, -1),
'name': name,
}
# ...This change will make the real-time update much more efficient.
This PR enables sharing and revocation of speech/person profiles across Omi users and integrates shared profiles into the realtime
/v4/listenspeaker-identification pipeline. Once a mate’s profile is shared and has an embedding, Omi can automatically identify them by name in live conversations without repeated manual labeling.What I changed
Database helpers
shared_peoplecollection.Speech profile APIs
POST /v3/speech-profile/share: Share a person/profile with another Omi user after validating ownership.POST /v3/speech-profile/revoke: Revoke a previously shared profile.GET /v3/speech-profile/shared: List profiles shared with the authenticated user.Realtime
/v4/listenintegrationshared_peopleinto the in-memory speaker embedding cache.SPEAKER_MATCH_THRESHOLDfor consistent behavior.Smoke testing
backend/scripts/SMOKE_TEST.mdwith step-by-step staging validation instructions.backend/scripts/smoke_shared_profile_test.pyPreviously, only the session owner’s voice profile could be learned and identified automatically. This limited automation in multi-person conversations. By allowing speech profiles to be shared (and revoked) across users and wiring them into the realtime speaker-identification flow, Omi can now identify mates by name automatically after a one-time setup.
Testing