Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 77 additions & 28 deletions scripts/scheduled/reindex_elasticsearch_cronjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import django
django.setup()

from django.contrib.auth.models import User
from django.conf import settings

from sefaria.model import *
from sefaria.search import index_all, get_new_and_current_index_names, index_client, es_client, TextIndexer, setup_logging
from sefaria.local_settings import SEFARIA_BOT_API_KEY
Expand Down Expand Up @@ -149,29 +152,23 @@ def detailed_failure_report(self) -> str:
if self.failed_text_versions:
lines.append(f"\nFailed Text Versions: {len(self.failed_text_versions)}")
lines.append("-" * 40)
for i, failure in enumerate(self.failed_text_versions[:50], 1):
for i, failure in enumerate(self.failed_text_versions, 1):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this change or what the original code intended. Why were we stopping at 50 failed_text_versions before and why are we no longer doing this? Same question about only looking at 20 skipped_text_versions below.

title = failure.get('title', 'Unknown')
version = failure.get('version', 'Unknown')
lang = failure.get('lang', 'Unknown')
error_type = failure.get('error_type', 'Unknown')
error = failure.get('error', 'Unknown error')[:100]
error = failure.get('error', 'Unknown error')
lines.append(f"{i}. {title} ({version}, {lang})")
lines.append(f" Error: {error_type}: {error}")

if len(self.failed_text_versions) > 50:
lines.append(f"... and {len(self.failed_text_versions) - 50} more")

if self.skipped_text_versions:
lines.append(f"\nSkipped Text Versions: {len(self.skipped_text_versions)}")
lines.append("-" * 40)
for i, skip in enumerate(self.skipped_text_versions[:20], 1):
for i, skip in enumerate(self.skipped_text_versions, 1):
title = skip.get('title', 'Unknown')
version = skip.get('version', 'Unknown')
reason = skip.get('reason', 'Unknown')
lines.append(f"{i}. {title} ({version}) - {reason}")

if len(self.skipped_text_versions) > 20:
lines.append(f"... and {len(self.skipped_text_versions) - 20} more")

lines.append(SEPARATOR_LINE)
return "\n".join(lines)
Expand All @@ -190,11 +187,69 @@ def check_elasticsearch_connection() -> bool:
return False


def check_django_database_connection() -> bool:
"""
Verify Django database (PostgreSQL) is reachable and properly configured.

This is critical because sheet indexing requires Django User lookups.
If the database is unreachable, UserProfile falls back to "User {id}" names,
which corrupts the search index with useless data.

Returns:
True if database is properly configured and reachable, False otherwise.
"""
# First, check if database credentials are configured
db_config = settings.DATABASES.get('default', {})
db_user = db_config.get('USER')
db_password = db_config.get('PASSWORD')
db_host = db_config.get('HOST')
db_port = db_config.get('PORT')
db_name = db_config.get('NAME')

# Log configuration (without password)
logger.debug(f"Django database config - host: {db_host}, port: {db_port}, "
f"name: {db_name}, user: {db_user}, password_set: {bool(db_password)}")

# Check for missing credentials (os.getenv returns None when env var is missing)
missing_fields = []
if not db_user:
missing_fields.append("USER (DATABASES_USER env var)")
if not db_password:
missing_fields.append("PASSWORD (DATABASES_PASSWORD env var)")
if not db_host:
missing_fields.append("HOST (DATABASES_HOST env var)")
if not db_port:
missing_fields.append("PORT (DATABASES_PORT env var)")

if missing_fields:
logger.error(f"Django database configuration incomplete - missing: {', '.join(missing_fields)}")
logger.error("This typically means the 'local-settings-secret' Kubernetes secret is missing or misconfigured.")
logger.error("Sheet indexing will produce 'User {id}' names instead of real names!")
return False

# Now try to actually connect and query
try:
# Simple query to verify connection works
user_count = User.objects.count()
logger.debug(f"Django database connection verified - user_count: {user_count}")

# Extra sanity check: verify we can get a real user name
first_user = User.objects.first()
if first_user:
logger.debug(f"Sample user lookup successful - id: {first_user.id}, "
f"name: {first_user.first_name} {first_user.last_name}")

return True
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if instead of catching a general Exception, we should try...catch User.objects.count separately from User.objects.first.

logger.error(f"Django database connection failed - error: {str(e)}", exc_info=True)
logger.error(f"Database config - host: {db_host}, port: {db_port}, name: {db_name}, user: {db_user}")
return False


def check_index_exists(index_name: str) -> bool:
"""Check if an Elasticsearch index exists."""
try:
exists = index_client.exists(index=index_name)
logger.debug(f"Index existence check - index: {index_name}, exists: {exists}")
return exists
except Exception as e:
logger.warning(f"Failed to check index existence - index: {index_name}, error: {str(e)}")
Expand Down Expand Up @@ -234,10 +289,6 @@ def log_index_state(index_type: str, result: ReindexingResult):
new_exists = check_index_exists(new_index)
new_count = get_index_doc_count(new_index) if new_exists else 0

logger.debug(f"Index state for {index_type} - alias: {alias}, current_index: {current_index}, "
f"current_doc_count: {current_count}, new_index: {new_index}, "
f"new_index_exists: {new_exists}, new_doc_count: {new_count}")

# Warn if new index already exists with documents
if new_exists and new_count > 0:
result.add_warning(
Expand All @@ -251,7 +302,6 @@ def log_index_state(index_type: str, result: ReindexingResult):

def run_pagesheetrank_update(result: ReindexingResult) -> bool:
"""Run pagesheetrank update with error handling."""
logger.debug("Starting pagesheetrank update")
try:
update_pagesheetrank()
result.record_step_success("pagesheetrank_update", "PageSheetRank values updated successfully")
Expand All @@ -263,7 +313,6 @@ def run_pagesheetrank_update(result: ReindexingResult) -> bool:

def run_index_all(result: ReindexingResult) -> bool:
"""Run full index with error handling and failure capture."""
logger.debug("Starting full index rebuild")
try:
index_all()

Expand Down Expand Up @@ -305,10 +354,6 @@ def should_retry_with_backoff(attempt: int, max_retries: int, context: str = "")
return False

wait_time = attempt * 30 # Linear backoff (30s, 60s, 90s)
log_msg = f"Retrying in {wait_time} seconds..."
if context:
log_msg = f"{context} - {log_msg}"
logger.debug(log_msg)
time.sleep(wait_time)
return True

Expand All @@ -319,14 +364,10 @@ def run_sheets_by_timestamp(timestamp: str, result: ReindexingResult, max_retrie

This catches sheets created/modified during the reindexing process.
"""
logger.debug(f"Starting sheets-by-timestamp API call - timestamp: {timestamp}")

url = "https://www.sefaria.org/admin/index-sheets-by-timestamp"

for attempt in range(1, max_retries + 1):
try:
logger.debug(f"API attempt {attempt}/{max_retries} - url: {url}")

r = requests.post(
url,
data={"timestamp": timestamp, "apikey": SEFARIA_BOT_API_KEY},
Expand Down Expand Up @@ -400,7 +441,6 @@ def main():

# Store timestamp before we start (sheets created after this will be caught by API)
last_sheet_timestamp = datetime.now().isoformat()
logger.debug(f"Captured start timestamp for sheet catch-up - timestamp: {last_sheet_timestamp}")

# Pre-flight checks
logger.debug("Running pre-flight checks...")
Expand All @@ -411,8 +451,18 @@ def main():
logger.info(result.get_summary())
sys.exit(1)
result.record_step_success("preflight_elasticsearch", "Elasticsearch connection verified")

# 2. Log current index states

# 2. Check Django database connection (critical for sheet indexing)
if not check_django_database_connection():
result.record_step_failure("preflight_django_database",
"Cannot connect to Django database. Sheet indexing would produce 'User {id}' names. "
"Check that 'local-settings-secret' Kubernetes secret is properly configured with "
"DATABASES_USER, DATABASES_PASSWORD, DATABASES_HOST, DATABASES_PORT environment variables.")
logger.info(result.get_summary())
sys.exit(1)
result.record_step_success("preflight_django_database", "Django database connection verified")

# 3. Log current index states
try:
log_index_state('text', result)
log_index_state('sheet', result)
Expand Down Expand Up @@ -470,7 +520,6 @@ def main():

# Only log final index states if there were failures
if result.failed_text_versions or result.skipped_text_versions or result.steps_failed:
logger.debug("Final index states:")
try:
log_index_state('text', result)
log_index_state('sheet', result)
Expand Down
48 changes: 3 additions & 45 deletions sefaria/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,9 @@ def create_index(index_name, type, force=False):
raise

if type == 'text':
logger.debug(f"Applying text mapping to index - index_name: {index_name}")
put_text_mapping(index_name)
logger.debug(f"Text mapping applied successfully - index_name: {index_name}")
elif type == 'sheet':
logger.debug(f"Applying sheet mapping to index - index_name: {index_name}")
put_sheet_mapping(index_name)
logger.debug(f"Sheet mapping applied successfully - index_name: {index_name}")
else:
logger.warning(f"Unknown type, no mapping applied - type: {type}, index_name: {index_name}")

Expand Down Expand Up @@ -558,7 +554,6 @@ def traverse(mini_toc):
r = Ref(title)
except InputError:
parse_errors.append(title)
logger.debug(f"Failed to parse ref - title: {title}")
return
vlist = cls.get_ref_version_list(r)
vpriorities = defaultdict(lambda: 0)
Expand Down Expand Up @@ -602,9 +597,6 @@ def get_all_versions(cls, tries=0, versions=None, page=0):
versions += temp_versions
page += 1
first_run = False
# Log progress every 100 pages
if page % PROGRESS_LOG_EVERY_N == 0:
logger.debug(f"Fetching versions - page: {page}, total_so_far: {len(versions)}")
logger.debug(f"Completed fetching all versions - total: {len(versions)}")
return versions
except pymongo.errors.AutoReconnect as e:
Expand Down Expand Up @@ -900,11 +892,9 @@ def index_sheets_by_timestamp(timestamp):

name_dict = get_new_and_current_index_names('sheet', debug=False)
curr_index_name = name_dict.get('current')
logger.debug(f"Using sheet index - index_name: {curr_index_name}")

try:
ids = db.sheets.find({"status": "public", "dateModified": {"$gt": timestamp}}).distinct("id")
logger.debug(f"Found sheets to index by timestamp - count: {len(ids)}, timestamp: {timestamp}")
except Exception as e:
logger.error(f"Error querying sheets by timestamp - timestamp: {timestamp}, error: {str(e)}", exc_info=True)
return str(e)
Expand Down Expand Up @@ -939,7 +929,6 @@ def index_public_sheets(index_name):

ids = db.sheets.find({"status": "public"}).distinct("id")
total = len(ids)
logger.debug(f"Found public sheets to index - total: {total}, first_10_ids: {ids[:10] if ids else []}")

succeeded = 0
failed = 0
Expand Down Expand Up @@ -974,17 +963,13 @@ def clear_index(index_name):
"""
Delete the search index.
"""
logger.debug(f"Attempting to delete Elasticsearch index - index_name: {index_name}")
try:
# Check if index exists before trying to delete
if index_client.exists(index=index_name):
index_client.delete(index=index_name)
logger.debug(f"Successfully deleted Elasticsearch index - index_name: {index_name}")
else:
logger.debug(f"Index does not exist, nothing to delete - index_name: {index_name}")
except NotFoundError:
# Index doesn't exist - handle race condition where index is deleted between exists() check and delete() call
logger.debug(f"Index not found when attempting to delete - index_name: {index_name}")
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why we are removing so many logger statements. Couldn't they be useful? Especially, here... it seems bad to have a "pass" in an exception

except Exception as e:
logger.error(f"Error deleting Elasticsearch index - index_name: {index_name} - error: {str(e)}")

Expand Down Expand Up @@ -1099,9 +1084,7 @@ def index_all(skip=0, debug=False):
raise

# Clear index queue
logger.debug("Clearing stale index queue")
deleted = db.index_queue.delete_many({})
logger.debug(f"Cleared index queue - deleted_count: {deleted.deleted_count}")

end = datetime.now()
total_elapsed = end - start
Expand All @@ -1122,47 +1105,32 @@ def index_all_of_type(type, skip=0, debug=False):

# Check if new index already exists
new_exists = index_client.exists(index=index_names_dict.get('new'))
if new_exists:
try:
stats = index_client.stats(index=index_names_dict.get('new'))
doc_count = stats.get('_all', {}).get('primaries', {}).get('docs', {}).get('count', 0)
logger.debug(f"New index already exists, will be recreated - index: {index_names_dict.get('new')}, existing_doc_count: {doc_count}")
except Exception:
logger.debug(f"New index already exists, will be recreated - index: {index_names_dict.get('new')}")

# Countdown (keeping for backwards compatibility, but logging instead of just printing)
logger.debug("Starting countdown before indexing...")
for i in range(10):
remaining = 10 - i
logger.debug(f'STARTING IN T-MINUS {remaining}')
logger.debug(f"Countdown - seconds_remaining: {remaining}")
pytime.sleep(1)

# Perform the actual indexing
logger.debug(f"Beginning indexing operation - type: {type}, index_name: {index_names_dict.get('new')}")
index_all_of_type_by_index_name(type, index_names_dict.get('new'), skip, debug)

# Switch aliases
logger.debug("Switching aliases after indexing")
try:
index_client.delete_alias(index=index_names_dict.get('current'), name=index_names_dict.get('alias'))
logger.debug(f"Successfully deleted alias from old index - alias: {index_names_dict.get('alias')}, old_index: {index_names_dict.get('current')}")
except NotFoundError:
logger.debug(f"Alias not found on old index (may be first run) - alias: {index_names_dict.get('alias')}, old_index: {index_names_dict.get('current')}")
# Alias not found on old index (may be first run)
pass

# Clear any index with the alias name
clear_index(index_names_dict.get('alias'))

# Create new alias
index_client.put_alias(index=index_names_dict.get('new'), name=index_names_dict.get('alias'))
logger.debug(f"Successfully created alias for new index - alias: {index_names_dict.get('alias')}, new_index: {index_names_dict.get('new')}")

# Cleanup old index
if index_names_dict.get('new') != index_names_dict.get('current'):
logger.debug(f"Cleaning up old index - old_index: {index_names_dict.get('current')}")
clear_index(index_names_dict.get('current'))

logger.debug(f"Completed index_all_of_type for '{type}' - type: {type}, final_index: {index_names_dict.get('new')}, alias: {index_names_dict.get('alias')}")


def index_all_of_type_by_index_name(type, index_name, skip=0, debug=False, force_recreate=True):
Expand All @@ -1179,16 +1147,8 @@ def index_all_of_type_by_index_name(type, index_name, skip=0, debug=False, force

# Check if index exists and validate
index_exists = index_client.exists(index=index_name)
if index_exists and skip == 0:
try:
stats = index_client.stats(index=index_name)
doc_count = stats.get('_all', {}).get('primaries', {}).get('docs', {}).get('count', 0)
logger.debug(f"Index already exists before creation - index_name: {index_name}, existing_doc_count: {doc_count}, will_recreate: {force_recreate}")
except Exception as e:
logger.debug(f"Could not check existing index stats - index_name: {index_name}, error: {str(e)}")

if skip == 0:
logger.debug(f"Creating fresh index (skip=0) - index_name: {index_name}, type: {type}")
create_index(index_name, type, force=force_recreate)
else:
logger.info(f"Skipping index creation (resuming) - index_name: {index_name}, skip: {skip}")
Expand All @@ -1203,9 +1163,7 @@ def index_all_of_type_by_index_name(type, index_name, skip=0, debug=False, force
TextIndexer.index_all(index_name, debug=debug)
logger.debug("Completed TextIndexer.index_all")
elif type == 'sheet':
logger.debug("Starting sheet indexing")
index_public_sheets(index_name)
logger.debug("Completed sheet indexing")
else:
logger.error(f"Unknown index type - type: {type}")
raise ValueError(f"Unknown index type: {type}")
Loading