Skip to content

Python package: Reconstructing text by streaming from an Iterable with optional termination criteria

License

Notifications You must be signed in to change notification settings

DrMicrobit/dm-streamvalve

Repository files navigation

pylint workflow pytest workflow

dm-streamvalve

Python package to reconstruct text as a string from an Iterable with optional stopping criteria.

Nothing stellar, initially developed to work with the Ollama Python module to be able to stop the output after, e.g. two paragraphs. Or to stop the output when a model got stuck in an endless loop, constantly repeating some lines over and over again.

Stopping criteria can be:

  • Maximum number of lines encountered
  • Maximum number of paragraphs encountered. Paragraphs being text blocks separated by one or several blank lines
  • Maximum number of lines with exact copies encountered previously
  • Maximum number of tokens encountered in a line

Installation

If you haven't done so already, please install uv as this Python package and project manager basically makes all headaches of Python package management go away in an instant.

With uv, adding dm-streamvalve to your project is as easy as

uv add dm-streamvalve

Usage

You need to import StreamValve, instantiate a StreamValve object, and then call process() to start processing the stream. Results, including information on the reason for the processing having stopped, are returned in a dictionary

Processing can be restarted and will return either the next results of the stream or the last result if the processing had been stopped because the stream was exhausted.

from dm_streamvalve.streamvalve import StreamValve

sv = StreamValve(...)
result = sv.process()
print(result['text'])
print(result['stopmsg'])

result = sv.process()
...

The StreamValve object

Allows to process an iterable of anything to reconstruct as string the text contained within.

Has optional callbacks to extract text from each element, as well as callbacks once a token or a line has been accepted into the result.

Allows early stopping on repeated lines, maximum number of lines, maximum number of paragraphs, or a termination signal from the callback.

StreamValve(ostream, callback_extract = None, callback_token = None, callback_line = None, max_linerepeats = 0, max_lines = 0, max_paragraphs = 0, max_linetokens=0)

Required args:

  • ostream : Iterable
    Iterable of Any to reconstruct the text from.

Optional args:

  • callback_extract : Callable[[Any], str | None] If None, calls 'str()' on each element of the iterable to get next string of the stream return type.
    If not None, calls the function to extract the string
    If return value is None instead of a str, leads to early termination. Useful for, e.g., Ollama where each element in the ostream is of type ollama.ChatResponse(), and the string of that is in ["message"]["content"] of each element
  • callback_token : Callable[[str], None] Each time an element of the stream has been added to the result, i.e., it did not lead to termination, this callback is called if not None.
    Can be used, e.g., to stream the processing as it happens.
    Unfortunately, the repeated line termination will have been streamed.
  • callback_line : Callable[[str], None] Similar to callback token, but for each completed line. If the line did not trigger max_linerepeats, this callback is called.
    Can be used, e.g., to stream only fully accepted lines.
  • max_linerepeats : int Maximum number of line repeats allowed. Defaults to 0 (no limit).
  • max_lines : int Maximum number of lines allowed. Defaults to 0 (no limit).
  • max_paragraphs : int Maximum number of paragraphs allowed. Defaults to 0 (no limit).
  • max_linetokens : int Maximum number of tokens allowed in a single line. Defaults to 0 (no limit).

The process() function

Reads items from the Iterable of the StreamValve and returns a dict containing reconstructed text, number of lines, number of paragraphs, and stop criterion and the string stopped at if an early termination occurred.

Returns: An object of type StreamData, which is of type dict[str, Any] The following fields will be defined and set:

  • "text": str, # The text reconstructed as string
  • "num_tokens": int, # Number of tokens processed
  • "num_lines": int, # Number of lines in the reconstructed text
  • "num_paragraphs": int, # Number of paragraphs in the reconstructed text
  • "stopcrit": StopReason, # Enum, the reason why processing of the stream stopped
  • "stopmsg": str, # a human readable message for the above stop criterion
  • "stopat": None | str, # token that led to stop of processing or None if reason was end of stream

If termination was initiated by callable() returning None, stopat may be None if the signal by the callable was the only reason for stopping, else it contains the token/string which led to termination.

Usage examples

Full example 1: get complete stream

This example shows streaming a complete Iterable, from start to end. In this case a list of strings.

from dm_streamvalve.streamvalve import StreamValve

demotext = [
    "Hello\nWorld\n",
    "\nNice day for fishin', eh?",
    "\n",
    "\n\n",
    "\nFind that reference :-)\n",
]

sv = StreamValve(demotext)
print(sv.process()["text"])

This will print:

Hello
World

Nice day for fishin', eh?



Find that reference :-)

Example 2: Stopping criteria

Here, a stopping criterion is set to have at max 2 paragraphs.

sv = StreamValve(demotext, max_paragraphs=2)
print(sv.process()["text"])

This will print:

Hello
World

Nice day for fishin', eh?



Note

The newlines at the end are part of the result as process() will stop only at the start of the next paragraph ("Find ...")

Example 3: Restart reading stream after stopping

This example shows

  • stopping at a repeated line. Here, max_linerepeats=3 means: on the 4th apparition of a line already seen before, processing stops, the 4th repetition is not part of the result.
  • one can continue the processing after an early stop.
  • a string is also an iterable
sv = StreamValve(
    """Here are african animals:

- Zebra
- Lion
- Zebra
- Elephant
- Zebra
- Gnu
- Zebra
- Antelope
""",
    max_linerepeats=3, # include max 3 copies if identical lines
)

print(sv.process()["text"])

print("*** Above are the first animals, from Zebra to Gnu, as the 4th Zebra triggered a stop.")
print("*** You can continue the processing, the Zebra which triggered the stop will be 1st.")

print(sv.process()["text"])

This will print:

Here are african animals:

- Zebra
- Lion
- Zebra
- Elephant
- Zebra
- Gnu

*** Above are the first animals, from Zebra to Gnu, as the 4th Zebra triggered a stop.
*** You can continue the processing, the Zebra which triggered the stop will be 1st.
- Zebra
- Antelope

Note

Restarting processing resets the stopping criteria. E.g., in the example above but with a longer text, process() would read the stream again until it counted 3 more 'Zebra' and then encountered a 4th.

Full Example 4: reconstructing text from streams of arbitrary type, e.g., Ollama ChatResponse

This example shows how to:

  • monitor the output of Ollama on stdout as it is generated via having callback_token point to a function (here: monitor)
  • extract the text from every element of the Ollama ChatResponse stream to make it available to StreamValve via callback_extract.
  • setting multiple stopping criteria as fail-safe

Important

For the code below to work, you need to have (1) Ollama installed and running, the llama3.1 model installed in Ollama (ollama pull llama3.1), and (3) your Python project needs to have the Ollama Python module installed via, e.g., uv add ollama.

import ollama
from dm_streamvalve.streamvalve import StreamValve

def extract_chat_response(cr: ollama.ChatResponse) -> str:
    """Ollama ChatResponse `cr` is a dictionary of dictionaries, where the text of the
    current token is in cr["message"]["content"]"""
    return cr["message"]["content"]

def monitor(s: str):
    """Callback for streamvalve to monitor chat response"""
    print(s, end="", flush=True)

ostream = ollama.chat(
    model="llama3.1",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Name 50 animals in a dashed list. One per line."},
    ],
    stream=True,
)

sv = StreamValve(
    ostream,
    callback_extract=extract_chat_response,
    callback_token=monitor,
    max_linerepeats=3, # include max 3 copies if identical lines
    max_lines=200,     # max of 200 lines
)

sv.process()

This should result in an output on stdout similar to this one:

- Lion
- Elephant
- Gorilla
- Kangaroo
...

Notes

The GitHub repository comes with all files I currently use for Python development across multiple platforms. Notably:

  • configuration of the Python environment via uv: pyproject.toml and uv.lock
  • configuration for linter and code formatter ruff: ruff.toml
  • configuration for pylint: .pylintrc
  • git ignore files: .gitignore
  • configuration for pre-commit: .pre-commit-config.yaml. The script used to check git commit summary message is in devsupport/check_commitsummary.py
  • configuration for VSCode editor: .vscode directory

About

Python package: Reconstructing text by streaming from an Iterable with optional termination criteria

Resources

License

Stars

Watchers

Forks

Packages

No packages published