Skip to content

Conversation

@advaithh04
Copy link

This PR enables sharing and revocation of speech/person profiles across Omi users and integrates shared profiles into the realtime /v4/listen speaker-identification pipeline. Once a mate’s profile is shared and has an embedding, Omi can automatically identify them by name in live conversations without repeated manual labeling.

What I changed

  • Database helpers

    • Added helpers to persist, fetch, and remove shared speech/person profiles under a recipient user’s shared_people collection.
    • Shared entries store name, profile URL, and speaker embedding (when available), providing a canonical source for cross-user matching.
  • Speech profile APIs

    • POST /v3/speech-profile/share: Share a person/profile with another Omi user after validating ownership.
    • POST /v3/speech-profile/revoke: Revoke a previously shared profile.
    • GET /v3/speech-profile/shared: List profiles shared with the authenticated user.
    • Share/revoke actions publish Redis notifications for immediate propagation to active sessions.
  • Realtime /v4/listen integration

    • Active listen sessions now load both local people and shared_people into the in-memory speaker embedding cache.
    • Added an async Redis pubsub listener to update the cache in realtime on share/revoke events, so active sessions pick up changes without restarting.
    • Shared profiles are namespaced to avoid ID collisions.
    • Matching reuses the existing embedding comparison logic and SPEAKER_MATCH_THRESHOLD for consistent behavior.
  • Smoke testing

    • Added backend/scripts/SMOKE_TEST.md with step-by-step staging validation instructions.
    • Added backend/scripts/smoke_shared_profile_test.py

Previously, only the session owner’s voice profile could be learned and identified automatically. This limited automation in multi-person conversations. By allowing speech profiles to be shared (and revoked) across users and wiring them into the realtime speaker-identification flow, Omi can now identify mates by name automatically after a one-time setup.

Testing

  • Manual code review and logical validation of persistence, Redis pubsub flow, and listen-session cache updates.
  • Smoke-test script and documentation added for staging verification.
  • Full end-to-end testing (live audio, embeddings API, Firestore/GCS/Redis) must be run in staging with real credentials.

…Implemented share/revoke of speech profiles and wired shared embeddings, with staging smoke tests.
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant new feature for sharing speech profiles and integrating them into the real-time speaker identification pipeline. The implementation is well-structured, with new database helpers, API endpoints, and updates to the websocket handler. My review focuses on improving robustness and efficiency. I've identified several areas where error handling can be improved to prevent silent failures, particularly around database access and Redis communication. I've also suggested refactoring to address code duplication and a performance optimization for the real-time update mechanism to make it more scalable. All original comments have been retained as they align with best practices and are not contradicted by any provided rules.

Comment on lines 125 to 132
try:
from database.users import get_people

all_people = get_people(uid)
if all_people:
person_doc = all_people[0]
except Exception:
person_doc = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The try...except Exception block is too broad and silently ignores any errors that might occur when fetching the user's people from the database. If an error occurs (e.g., a database connection issue), person_doc will be None, and the profile will be shared with an empty embedding without any indication of failure. This can lead to unexpected behavior and makes debugging difficult. The exception should be logged.

        except Exception as e:
            print(f"Failed to get people for user {uid} when sharing profile: {e}")
            person_doc = None

Comment on lines 141 to 146
try:
channel = f'users:{target_uid}:shared_profiles'
payload = {'action': 'add', 'source_uid': uid, 'name': person_name}
redis_db.r.publish(channel, json.dumps(payload))
except Exception:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The try...except Exception: pass block silently ignores failures when publishing to Redis. If Redis is unavailable, the notification for the shared profile will not be sent, and active listening sessions will not be updated in real-time. This undermines a key aspect of the feature. The exception should be logged to aid in debugging.

    try:
        channel = f'users:{target_uid}:shared_profiles'
        payload = {'action': 'add', 'source_uid': uid, 'name': person_name}
        redis_db.r.publish(channel, json.dumps(payload))
    except Exception as e:
        print(f"Failed to publish 'add' shared profile notification to Redis for user {target_uid}: {e}")

Comment on lines 155 to 160
try:
channel = f'users:{target_uid}:shared_profiles'
payload = {'action': 'remove', 'source_uid': uid}
redis_db.r.publish(channel, json.dumps(payload))
except Exception:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the share endpoint, the try...except Exception: pass block here silently ignores failures when publishing the 'revoke' notification to Redis. This can lead to stale data in active sessions if Redis is down. The exception should be logged.

    try:
        channel = f'users:{target_uid}:shared_profiles'
        payload = {'action': 'remove', 'source_uid': uid}
        redis_db.r.publish(channel, json.dumps(payload))
    except Exception as e:
        print(f"Failed to publish 'remove' shared profile notification to Redis for user {target_uid}: {e}")

Comment on lines 1346 to 1361
# Also load shared people (profiles other users shared with this user)
try:
shared = user_db.get_shared_people(uid)
for s in shared:
sid = s.get('id')
emb = s.get('speaker_embedding')
name = s.get('name') or f"Shared-{sid}"
if emb:
# Use a namespaced id to avoid collision with local people ids
ns_id = f"shared_{sid}"
person_embeddings_cache[ns_id] = {
'embedding': np.array(emb, dtype=np.float32).reshape(1, -1),
'name': name,
}
except Exception as e:
print(f"Speaker ID: failed loading shared people: {e}", uid, session_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic for loading shared people and populating the person_embeddings_cache is duplicated here and inside the _shared_profiles_listener (lines 1391-1402). This duplication makes the code harder to maintain, as changes might be applied to one location but missed in the other, potentially introducing bugs.

To improve maintainability, this logic should be extracted into a helper function. For example:

def _load_shared_people_into_cache(uid: str, cache: dict):
    try:
        shared = user_db.get_shared_people(uid)
        for s in shared:
            sid = s.get('id')
            emb = s.get('speaker_embedding')
            name = s.get('name') or f"Shared-{sid}"
            if emb:
                ns_id = f"shared_{sid}"
                cache[ns_id] = {
                    'embedding': np.array(emb, dtype=np.float32).reshape(1, -1),
                    'name': name,
                }
    except Exception as e:
        print(f"Speaker ID: failed loading shared people: {e}", uid, session_id)

# ... then call it in both places:
_load_shared_people_into_cache(uid, person_embeddings_cache)

Comment on lines 1386 to 1408
try:
payload = json.loads(msg)
action = payload.get('action')
source_uid = payload.get('source_uid')
if action == 'add':
# reload shared people
shared = user_db.get_shared_people(uid)
for s in shared:
sid = s.get('id')
emb = s.get('speaker_embedding')
name = s.get('name') or f"Shared-{sid}"
if emb:
ns_id = f"shared_{sid}"
person_embeddings_cache[ns_id] = {
'embedding': np.array(emb, dtype=np.float32).reshape(1, -1),
'name': name,
}
elif action == 'remove':
ns_id = f"shared_{source_uid}"
if ns_id in person_embeddings_cache:
del person_embeddings_cache[ns_id]
except Exception:
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The try...except Exception: continue block in the Redis listener will silently ignore any errors during message processing. This includes JSON decoding errors, database errors when reloading profiles, or any other unexpected issues. If an error occurs, the person_embeddings_cache will not be updated, leading to stale data and incorrect speaker identification, without any log of what went wrong. The exception should be logged, along with the message payload, for easier debugging.

                    try:
                        payload = json.loads(msg)
                        action = payload.get('action')
                        source_uid = payload.get('source_uid')
                        if action == 'add':
                            # reload shared people
                            shared = user_db.get_shared_people(uid)
                            for s in shared:
                                sid = s.get('id')
                                emb = s.get('speaker_embedding')
                                name = s.get('name') or f"Shared-{sid}"
                                if emb:
                                    ns_id = f"shared_{sid}"
                                    person_embeddings_cache[ns_id] = {
                                        'embedding': np.array(emb, dtype=np.float32).reshape(1, -1),
                                        'name': name,
                                    }
                        elif action == 'remove':
                            ns_id = f"shared_{source_uid}"
                            if ns_id in person_embeddings_cache:
                                del person_embeddings_cache[ns_id]
                    except Exception as e:
                        print(f"Error processing shared profile message for user {uid}: {e}. Payload: {msg}")
                        continue

Comment on lines 1390 to 1402
if action == 'add':
# reload shared people
shared = user_db.get_shared_people(uid)
for s in shared:
sid = s.get('id')
emb = s.get('speaker_embedding')
name = s.get('name') or f"Shared-{sid}"
if emb:
ns_id = f"shared_{sid}"
person_embeddings_cache[ns_id] = {
'embedding': np.array(emb, dtype=np.float32).reshape(1, -1),
'name': name,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation for handling the add action in the Redis listener is inefficient. It re-fetches and re-processes all shared profiles from the database every time a single new profile is shared. This can lead to unnecessary database load and processing, especially if a user has many shared profiles.

A more efficient approach would be to include the new profile's data (including the embedding) directly in the Redis message payload. This would eliminate the need for a database query in the listener.

1. In backend/routers/speech_profile.py:
Modify the share_speech_profile_with_user function to include the embedding in the payload:

# ...
person_name = name or (person_doc.get('name') if person_doc else 'Unknown')
add_shared_person_to_user(target_uid, uid, person_name, embedding or [], profile_url)

# Notify target user's active sessions via Redis pubsub
try:
    channel = f'users:{target_uid}:shared_profiles'
    payload = {
        'action': 'add',
        'source_uid': uid,
        'name': person_name,
        'speaker_embedding': embedding or []
    }
    redis_db.r.publish(channel, json.dumps(payload))
# ...

2. In backend/routers/transcribe.py:
Update the _shared_profiles_listener to use the data from the payload directly:

# ...
if action == 'add':
    source_uid = payload.get('source_uid')
    emb = payload.get('speaker_embedding')
    name = payload.get('name') or f"Shared-{source_uid}"
    if emb and source_uid:
        ns_id = f"shared_{source_uid}"
        person_embeddings_cache[ns_id] = {
            'embedding': np.array(emb, dtype=np.float32).reshape(1, -1),
            'name': name,
        }
# ...

This change will make the real-time update much more efficient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant