-
Notifications
You must be signed in to change notification settings - Fork 9
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Phenomenon
When a user submits a job through Airflow, it runs for a while and then encounters the following error:
[2024-07-19, 12:46:11 CST] {taskinstance.py:1112} DEBUG - <TaskInstance: asset_map.job_identifier manual__2024-06-01T16:12:23+08:00 [running]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2024-07-19, 12:46:11 CST] {client.py:170} DEBUG - [DeleteNamespaceResource][264c][/apis/batch/v1/namespaces/user-jobs/jobs/job_identifier] State: Streaming
[2024-07-19, 12:46:11 CST] {operations.py:67} INFO - [user-jobs/jobs/job_identifier] deleted
[2024-07-19, 12:46:11 CST] {client.py:170} DEBUG - [DeleteNamespaceResource][264c][/apis/batch/v1/namespaces/user-jobs/jobs/job_identifier] State: Disconnected
[2024-07-19, 12:46:11 CST] {job_runner.py:286} INFO - {job-runner}: Job deleted
[2024-07-19, 12:46:11 CST] {job_runner.py:286} INFO - {job-runner}: Client stopped, execution aborted.
[2024-07-19, 12:46:11 CST] {taskinstance.py:1824} ERROR - Task failed with exception
airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiException: Error while executing query
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/path/to/airflow_kubernetes_job_operator/kubernetes_job_operator.py", line 463, in execute
rslt = self.job_runner.execute_job(
File "/path/to/airflow_kubernetes_job_operator/job_runner.py", line 451, in execute_job
raise ex
File "/path/to/zthreading/tasks.py", line 176, in _run_as_thread
rslt = self.action(*args, **kwargs)
File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 261, in _execute_query
raise ex
File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 257, in _execute_query
self.query_loop(client)
File "/path/to/airflow_kubernetes_job_operator/kube_api/queries.py", line 466, in query_loop
raise ex
File "/path/to/airflow_kubernetes_job_operator/kube_api/queries.py", line 459, in query_loop
return super().query_loop(client)
File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 446, in query_loop
raise ex from KubeApiException("Error while executing query")
File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 386, in query_loop
for line in self._read_response_stream_lines(response):
File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 212, in _read_response_stream_lines
chunk = chunk.decode("utf8")
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe6 in position 16375: unexpected end of data
The job gets deleted, and it's impossible to check the failure details later using commands like kubectl describe job or pod.
Cause of the Issue
The original code for retrieving the job context was:
Extracted in chunks and then reassembled.
If the job context description contains Chinese characters, such as “开始” (start), they are encoded as \xe5\xbc\x80\xe5\xa7\x8b but might be truncated to \xe5\xbc\x80\xe5 and \xa7\x8b. The \xe5 part is the beginning of a Chinese character.
Decoding \xe5\xbc\x80\xe5 as UTF-8 results in an error.
Airflow handles the triggered exception by abruptly deleting the job.
@classmethod
def _read_response_stream_lines(cls, response: HTTPResponse):
"""INTERNAL. Helper yield method. Parses the streaming http response
to lines (can be async!)
Yields:
str: The line
"""
prev = ""
for chunk in response.stream(decode_content=False):
if isinstance(chunk, bytes):
chunk = chunk.decode("utf8")
chunk = prev + chunk
lines = chunk.split("\n")
if not chunk endswith("\n"):
prev = lines[-1]
lines = lines[:-1]
else:
prev = ""
for line in lines:
if line:
yield lineSolutions
- Solution 1: Avoid using Chinese characters in the job configuration context for jobs submitted through Airflow.
- Solution 2: Modify Airflow's handling of retrieving job context information.
# The idea is to catch the UnicodeDecodeError exception, check if it occurred at the end of the chunk,
# indicating a truncation issue, store the truncated part, and parse only the untruncated part.
# Then, merge the truncated part with the next chunk for parsing.
@classmethod
def _read_response_stream_lines(cls, response: HTTPResponse):
"""INTERNAL. Helper yield method. Parses the streaming http response
to lines (can be async!)
Yields:
str: The line
"""
prev = ""
prev_binary_chunk = b""
for chunk in response.stream(decode_content=False):
if isinstance(chunk, bytes):
chunk = prev_binary_chunk + chunk
prev_binary_chunk = b""
try:
chunk = chunk.decode("utf8")
except UnicodeDecodeError as e:
if e.end != len(chunk):
raise
prev_binary_chunk = chunk[e.start:]
chunk = chunk[0:e.start].decode("utf8")
chunk = prev + chunk
lines = chunk.split("\n")
if not chunk endswith("\n"):
prev = lines[-1]
lines = lines[:-1]
else:
prev = ""
for line in lines:
if line:
yield line
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working