Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions projects/singularity_cinema/compose_video/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ def illustration_pos(t):
preset=self.preset,
write_logfile=False)

logger.info(f'file saved: {output_path}')

if os.path.exists(output_path) and os.path.getsize(output_path) > 1024:
test_clip = mp.VideoFileClip(output_path)
actual_duration = test_clip.duration
Expand All @@ -490,6 +492,7 @@ async def execute_code(self, messages, **kwargs):
final_name = 'final_video.mp4'
final_video_path = os.path.join(self.work_dir, final_name)
if os.path.exists(final_video_path):
logger.info(f'output: {final_video_path}')
return messages
with open(os.path.join(self.work_dir, 'segments.txt'), 'r') as f:
segments = json.load(f)
Expand Down
12 changes: 6 additions & 6 deletions webui/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ MS-Agent WebUI后端负责管理前端与ms-agent框架之间的通信,通过W
┌──────────────────┐ ┌──────────────────────┐
│Project Discovery │ │ Agent Runner │
└──────────────────┘ └──────────┬───────────┘
┌──────────────────┐ ┌──────────────────────┐
│Project Discovery │ │ Agent Runner │
└──────────────────┘ └──────────┬───────────┘
┌──────────────────┐ ┌──────────────────────┐
│ Session Manager │ │ ms-agent Process │
└──────────────────┘ └──────────────────────┘
┌──────────────────┐ ┌──────────────────────┐
│ Session Manager │ │ ms-agent Process │
└──────────────────┘ └──────────────────────┘
```

### 文件职责
Expand Down
224 changes: 166 additions & 58 deletions webui/backend/agent_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, Optional


Expand Down Expand Up @@ -41,29 +42,28 @@ def __init__(self,
self._workflow_steps = []
self._stop_requested = False

base_dir = Path(__file__).resolve().parents[
1] # equal to dirname(dirname(__file__))
work_dir = base_dir / 'work_dir' / str(session_id)
work_dir.mkdir(parents=True, exist_ok=True)
self.work_dir = work_dir

async def start(self, query: str):
"""Start the agent"""
try:
self._stop_requested = False
self.is_running = True
self._stop_requested = False
self.is_running = True
sent_terminal_event = False

# Build command based on project type
try:
cmd = self._build_command(query)
env = self._build_env()

print('[Runner] Starting agent with command:')
print(f"[Runner] {' '.join(cmd)}")
print(f"[Runner] Working directory: {self.project['path']}")

# Log the command
if self.on_log:
self.on_log({
'level': 'info',
'message': f'Starting agent: {" ".join(cmd[:5])}...',
'timestamp': datetime.now().isoformat()
})

# Start subprocess
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
Expand All @@ -73,17 +73,65 @@ async def start(self, query: str):
cwd=self.project['path'],
start_new_session=True)

print(f'[Runner] Process started with PID: {self.process.pid}')
read_exc = None
try:
await self._read_output()
except Exception as e:
# Do not terminate the workflow immediately when an exception occurs while reading output.
# Record it first; later still wait and return the rc.
read_exc = e
if self.on_log:
self.on_log({
'level': 'error',
'message': f'Read output failed: {repr(e)}',
'timestamp': datetime.now().isoformat()
})

# Start output reader
await self._read_output()
rc = await self.process.wait()
self.is_running = False

except Exception as e:
print(f'[Runner] ERROR: {e}')
if self._stop_requested:
if self.on_error:
self.on_error({
'message': 'Stopped by user',
'returncode': rc
})
sent_terminal_event = True
return

if rc == 0:
if self.on_complete:
self.on_complete({'returncode': 0})
sent_terminal_event = True
else:
# Non-zero must be returned.
msg = f'Process exited with code: {rc}'
if read_exc:
msg += f' (and output reader failed: {repr(read_exc)})'
if self.on_error:
self.on_error({'message': msg, 'returncode': rc})
sent_terminal_event = True

except Exception:
self.is_running = False
import traceback
traceback.print_exc()
tb = traceback.format_exc()
print(tb)
if self.on_error:
self.on_error({'message': str(e), 'type': 'startup_error'})
self.on_error({
'message': tb,
'type': 'startup_error',
'returncode': -1
})
sent_terminal_event = True

finally:
# Fallback: ensure the frontend always receives the completion signal under all circumstances.
if not sent_terminal_event and self.on_error:
self.on_error({
'message': 'Agent terminated unexpectedly',
'returncode': -1
})

async def stop(self):
"""Stop the agent"""
Expand Down Expand Up @@ -142,7 +190,7 @@ def _build_command(self, query: str) -> list:
# Use ms-agent CLI command (installed via entry point)
cmd = [
'ms-agent', 'run', '--config', config_file,
'--trust_remote_code', 'true'
'--trust_remote_code', 'true', '--output_dir', self.work_dir
]

if query:
Expand Down Expand Up @@ -181,60 +229,120 @@ def _build_env(self) -> Dict[str, str]:
return env

async def _read_output(self):
"""Read and process output from the subprocess"""
"""Read and process output from the subprocess (robust, no readline limit issue)"""
print('[Runner] Starting to read output...')

# Parameters related to reading output: can be adjusted as needed.
CHUNK_SIZE = 4096
# For single-line / no-newline output, buffer up to this size;
# if it exceeds the limit, flush in slices to avoid memory blow-up.
MAX_LINE_BUFFER = 256 * 1024

buf = bytearray()
return_code = None

try:
while self.is_running and self.process and self.process.stdout:
line = await self.process.stdout.readline()
if not line:
chunk = await self.process.stdout.read(CHUNK_SIZE)
if not chunk:
print('[Runner] No more output, breaking...')
break

text = line.decode('utf-8', errors='replace').rstrip()
print(f'[Runner] Output: {text[:200]}'
if len(text) > 200 else f'[Runner] Output: {text}')
buf.extend(chunk)

# Split out complete lines (delimited by '\n').
while True:
nl = buf.find(b'\n')
if nl == -1:
# No newline but the buffer is too large:
# force-flush it as a "line" to _process_line to avoid unbounded growth.
if len(buf) >= MAX_LINE_BUFFER:
part = bytes(buf[:MAX_LINE_BUFFER])
del buf[:MAX_LINE_BUFFER]
text = part.decode(
'utf-8', errors='replace').rstrip()
print(f'[Runner] Output(partial): {text[:200]}'
if len(text) > 200 else
f'[Runner] Output(partial): {text}')
await self._process_line(text)
break

line_bytes = bytes(buf[:nl + 1])
del buf[:nl + 1]

text = line_bytes.decode(
'utf-8', errors='replace').rstrip()
print(f'[Runner] Output: {text[:200]}'
if len(text) > 200 else f'[Runner] Output: {text}')
await self._process_line(text)

# Flush: the final tail chunk that doesn't end with a newline.
if buf:
text = bytes(buf).decode('utf-8', errors='replace').rstrip()
print(f'[Runner] Output(tail): {text[:200]}'
if len(text) > 200 else f'[Runner] Output(tail): {text}')
await self._process_line(text)

# Wait for process to complete
if self.process:
return_code = await self.process.wait()
print(f'[Runner] Process exited with code: {return_code}')

# If stop was requested, do not report as completion/error
if self._stop_requested:
if self.on_log:
self.on_log({
'level': 'info',
'message': 'Agent stopped by user',
'timestamp': datetime.now().isoformat()
})
return

if return_code == 0:
if self.on_complete:
self.on_complete({
'status':
'success',
'message':
'Agent completed successfully'
})
else:
if self.on_error:
self.on_error({
'message': f'Agent exited with code {return_code}',
'type': 'exit_error',
'code': return_code
})
buf.clear()

except Exception as e:
# Note: do not end the entire workflow just because of an exception while reading output;
# still call wait to obtain the exit code and return it.
print(f'[Runner] Read error: {e}')
import traceback
traceback.print_exc()

# Record the output-reading error as a log/error message first,
# but ultimately rely on the process exit code (if available).
if not self._stop_requested and self.on_error:
self.on_error({'message': str(e), 'type': 'read_error'})

finally:
self.is_running = False
print('[Runner] Finished reading output')
# Always call wait no matter what, to ensure the exit code is returned to the frontend
# (preventing it from loading indefinitely).
try:
if self.process:
return_code = await self.process.wait()
print(f'[Runner] Process exited with code: {return_code}')

if self._stop_requested:
if self.on_log:
self.on_log({
'level': 'info',
'message': 'Agent stopped by user',
'timestamp': datetime.now().isoformat()
})
else:
if return_code == 0:
if self.on_complete:
self.on_complete({
'status': 'success',
'message': 'Agent completed successfully',
'code': 0
})
else:
# Key: if the code is non-zero, it must be returned (error code)
# so the frontend can reach a terminal state.
if self.on_error:
self.on_error({
'message':
f'Agent exited with code {return_code}',
'type': 'exit_error',
'code': return_code
})
except Exception as e:
print(f'[Runner] Wait/Finalize error: {e}')
import traceback
traceback.print_exc()
if not self._stop_requested and self.on_error:
# Provide a fallback error code if `wait` fails as well.
self.on_error({
'message': str(e),
'type': 'finalize_error',
'code': -1
})
finally:
self.is_running = False
print('[Runner] Finished reading output')

async def _process_line(self, line: str):
"""Process a line of output"""
Expand Down
Loading