Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/lint-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v4

- name: Setup Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: 3.11
python-version: "3.11"

- name: Install poetry
run: |
Expand All @@ -25,7 +25,7 @@ jobs:
python -m poetry config virtualenvs.in-project true

- name: Cache the virtualenv
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ./.venv
key: ${{ runner.os }}-venv-${{ hashFiles('**/poetry.lock') }}
Expand Down
205 changes: 203 additions & 2 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "surfkit"
version = "0.1.393"
version = "0.1.394"
description = "A toolkit for building AI agents that use devices"
authors = ["Patrick Barker <patrickbarkerco@gmail.com>", "Jeffrey Huckabay <jfhucka@gmail.com>"]
license = "MIT"
Expand Down
235 changes: 56 additions & 179 deletions surfkit/runtime/agent/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
import logging
import os
import signal
import socket
import subprocess
import sys
import threading
import traceback
import urllib.error
import urllib.parse
import urllib.request
from typing import Dict, Iterator, List, Literal, Optional, Tuple, Type, Union

import httpx


def custom_thread_excepthook(args):
# Format the traceback
Expand All @@ -34,14 +32,10 @@ def custom_thread_excepthook(args):
from google.cloud import container_v1
from google.oauth2 import service_account
from kubernetes import client, config
from kubernetes.client import Configuration
from kubernetes.client.api import core_v1_api
from kubernetes.client.rest import ApiException
from kubernetes.stream import portforward
from mllm import Router
from namesgenerator import get_random_name
from pydantic import BaseModel
from taskara.server.models import V1Task
from tenacity import retry, stop_after_attempt, wait_fixed

from surfkit.server.models import (
Expand Down Expand Up @@ -75,14 +69,9 @@ def my_proxy_override(self, *args, **kwargs):

ws_client.PortForward._proxy = my_proxy_override


logger = logging.getLogger(__name__)


class APIOpts(BaseModel):
url: str


class GKEOpts(BaseModel):
cluster_name: str
region: str
Expand All @@ -94,27 +83,21 @@ class LocalOpts(BaseModel):


class KubeConnectConfig(BaseModel):
provider: Literal["api", "gke", "local"] = "local"
provider: Literal["gke", "local"] = "local"
namespace: str = "default"
api_opts: Optional[APIOpts] = None
gke_opts: Optional[GKEOpts] = None
local_opts: Optional[LocalOpts] = None
branch: Optional[str] = None


class KubeAgentRuntime(AgentRuntime["KubeAgentRuntime", KubeConnectConfig]):
"""A container runtime that uses Kubernetes to manage Pods directly"""

def __init__(self, cfg: Optional[KubeConnectConfig] = None) -> None:
# Load the Kubernetes configuration, typically from ~/.kube/config
if not cfg:
cfg = KubeConnectConfig()
self.cfg = cfg
if cfg.provider == "api":
opts = cfg.api_opts
if not opts:
raise ValueError("API opts missing")
self.connect_to_api(opts)
elif cfg.provider == "gke":
self.cfg = cfg or KubeConnectConfig()

self.kubeconfig = None
if cfg.provider == "gke":
opts = cfg.gke_opts
if not opts:
raise ValueError("GKE opts missing")
Expand All @@ -125,14 +108,19 @@ def __init__(self, cfg: Optional[KubeConnectConfig] = None) -> None:
opts = LocalOpts()
if opts.path:
config.load_kube_config(opts.path)
self.kubeconfig = opts.path
else:
raise ValueError("Unsupported provider: " + cfg.provider)

self.core_api = core_v1_api.CoreV1Api()
self.core_api = client.CoreV1Api()

self.namespace = cfg.namespace

self.subprocesses = []
self.setup_signal_handlers()

self.branch = cfg.branch

@classmethod
def name(cls) -> str:
return "kube"
Expand Down Expand Up @@ -299,13 +287,6 @@ def connect_config(self) -> KubeConnectConfig:
def connect(cls, cfg: KubeConnectConfig) -> "KubeAgentRuntime":
return cls(cfg)

def connect_to_api(self, opts: APIOpts) -> Tuple[client.CoreV1Api, str, str]:
configuration = client.Configuration()
configuration.host = opts.url
api_client = client.ApiClient(configuration)
v1_client = client.CoreV1Api(api_client)
return v1_client, "unknown", "anonymous"

@retry(stop=stop_after_attempt(15))
def connect_to_gke(self, opts: GKEOpts) -> Tuple[client.CoreV1Api, str, str]:
"""
Expand Down Expand Up @@ -451,155 +432,51 @@ def call(
data: Optional[dict] = None,
headers: Optional[dict] = None,
) -> Tuple[int, str]:
c = Configuration.get_default_copy()
c.assert_hostname = False # type: ignore
Configuration.set_default(c)
core_v1 = client.CoreV1Api()
##############################################################################
# Kubernetes pod port forwarding works by directly providing a socket which
# the python application uses to send and receive data on. This is in contrast
# to the go client, which opens a local port that the go application then has
# to open to get a socket to transmit data.
#
# This simplifies the python application, there is not a local port to worry
# about if that port number is available. Nor does the python application have
# to then deal with opening this local port. The socket used to transmit data
# is immediately provided to the python application.
#
# Below also is an example of monkey patching the socket.create_connection
# function so that DNS names of the following formats will access kubernetes
# ports:
#
# <pod-name>.<namespace>.kubernetes
# <pod-name>.pod.<namespace>.kubernetes
# <service-name>.svc.<namespace>.kubernetes
# <service-name>.service.<namespace>.kubernetes
#
# These DNS name can be used to interact with pod ports using python libraries,
# such as urllib.request and http.client. For example:
#
# response = urllib.request.urlopen(
# 'https://metrics-server.service.kube-system.kubernetes/'
# )
#
##############################################################################

# Monkey patch socket.create_connection which is used by http.client and
# urllib.request. The same can be done with urllib3.util.connection.create_connection
# if the "requests" package is used.
socket_create_connection = socket.create_connection

def kubernetes_create_connection(address, *args, **kwargs):
dns_name = address[0]
if isinstance(dns_name, bytes):
dns_name = dns_name.decode()
dns_name = dns_name.split(".")
if dns_name[-1] != "kubernetes":
return socket_create_connection(address, *args, **kwargs)
if len(dns_name) not in (3, 4):
raise RuntimeError("Unexpected kubernetes DNS name.")
namespace = dns_name[-2]
name = dns_name[0]
port = address[1]
# print("connecting to: ", namespace, name, port)
if len(dns_name) == 4:
if dns_name[1] in ("svc", "service"):
service = core_v1.read_namespaced_service(name, namespace)
for service_port in service.spec.ports: # type: ignore
if service_port.port == port:
port = service_port.target_port
break
else:
raise RuntimeError(f"Unable to find service port: {port}")
label_selector = []
for key, value in service.spec.selector.items(): # type: ignore
label_selector.append(f"{key}={value}")
pods = core_v1.list_namespaced_pod(
namespace, label_selector=",".join(label_selector)
)
if not pods.items:
raise RuntimeError("Unable to find service pods.")
name = pods.items[0].metadata.name
if isinstance(port, str):
for container in pods.items[0].spec.containers:
for container_port in container.ports:
if container_port.name == port:
port = container_port.container_port
break
else:
continue
break
else:
raise RuntimeError(
f"Unable to find service port name: {port}"
)
elif dns_name[1] != "pod":
raise RuntimeError(f"Unsupported resource type: {dns_name[1]}")
pf = portforward(
core_v1.connect_get_namespaced_pod_portforward,
name,
namespace,
ports=str(port),
data = data or {}
headers = headers or {}

workload_proxy_url = os.getenv("WORKLOAD_PROXY_URL")
if workload_proxy_url is not None:
print("Using workload proxy at", workload_proxy_url)
client_cert = os.getenv("WORKLOAD_PROXY_CLIENT_CERT")
client_key = os.getenv("WORKLOAD_PROXY_CLIENT_KEY")
ca_cert = os.getenv("WORKLOAD_PROXY_CA_CERT")

workload_proxy_client = httpx.Client(
verify=ca_cert, cert=(client_cert, client_key)
)
return pf.socket(port)

socket.create_connection = kubernetes_create_connection

namespace = self.namespace
if not namespace:
raise ValueError("NAMESPACE environment variable not set")
# Access the nginx http server using the
# "<pod-name>.pod.<namespace>.kubernetes" dns name.
# Construct the URL with the custom path
print(
f"connecting to: {name.lower()}.pod.{namespace}.kubernetes:{port}{path}",
flush=True,
)
url = f"http://{name.lower()}.pod.{namespace}.kubernetes:{port}{path}"

# Create a request object based on the HTTP method
if method.upper() == "GET":
if data:
# Convert data to URL-encoded query parameters for GET requests
query_params = urllib.parse.urlencode(data)
url += f"?{query_params}"
request = urllib.request.Request(url)

merged_headers = {
**headers,
"X-Pod-Name": name,
"X-Namespace": self.cfg.namespace,
"X-Port": str(port),
}
else:
# Set the request method and data for POST, PUT, etc.
request = urllib.request.Request(url, method=method.upper())
if data:
# Convert data to JSON string and set the request body
request.add_header("Content-Type", "application/json")
if headers:
for k, v in headers.items():
request.add_header(k, v)
request.data = json.dumps(data).encode("utf-8")
logger.debug(f"Request Data: {request.data}")

# Send the request and handle the response
try:
response = urllib.request.urlopen(request)
status_code = response.code
response_text = response.read().decode("utf-8")
logger.debug(f"Status Code: {status_code}")

# Parse the JSON response and return a dictionary
return status_code, response_text
except urllib.error.HTTPError as e:
status_code = e.code
error_message = e.read().decode("utf-8")
logger.error(f"Error: {status_code}")
logger.error(error_message)

raise SystemError(
f"Error making http request kubernetes pod {status_code}: {error_message}"
print("Using direct connection to workload service")
workload_proxy_client = httpx.Client()
merged_headers = headers
workload_proxy_url = (
f"http://{name}.{self.cfg.namespace}.svc.cluster.local:{port}"
)
finally:
try:
if response: # type: ignore
response.close()
except:
pass

json_data = None if method == "GET" else data
query_parameters = ""
if method == "GET" and data:
query_parameters = "?" + "&".join([f"{k}={v}" for k, v in data.items()])

url = f"{workload_proxy_url.rstrip('/')}/{path.lstrip('/')}" + query_parameters

print("Method: ", method)
print("URL: ", url)
print("Headers: ", merged_headers)
print("JSON Data: ", json_data)

r = workload_proxy_client.request(
method=method, url=url, headers=merged_headers, json=json_data
)

return r.status_code, r.text

def setup_signal_handlers(self):
signal.signal(signal.SIGINT, self.graceful_exit)
Expand Down
Loading