Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1701,7 +1701,10 @@ def finalize(
if getattr(ti.task, "overwrite_rtif_after_execution", False):
log.debug("Overwriting Rendered template fields.")
if ti.task.template_fields:
SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task)))
try:
SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task)))
except Exception:
log.exception("Failed to set rendered fields during finalization", ti=ti, task=ti.task)

log.debug("Running finalizers", ti=ti)
if state == TaskInstanceState.SUCCESS:
Expand Down
46 changes: 46 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
AirflowException,
AirflowFailException,
AirflowRescheduleException,
AirflowRuntimeError,
AirflowSensorTimeout,
AirflowSkipException,
AirflowTaskTerminated,
Expand Down Expand Up @@ -2179,6 +2180,51 @@ def __init__(self, bash_command, *args, **kwargs):
msg=SetRenderedFields(rendered_fields={"bash_command": rendered_cmd})
)

def test_overwrite_rtif_after_execution_handles_errors_gracefully(
self, create_runtime_ti, mock_supervisor_comms
):
"""
Test that errors during SetRenderedFields in finalize() don't mask the original task error.
"""

class TaskWithRTIF(BaseOperator):
overwrite_rtif_after_execution = True
template_fields = ["command"]

def __init__(self, command, *args, **kwargs):
self.command = command
super().__init__(*args, **kwargs)

task = TaskWithRTIF(task_id="test_task", command="test command")
runtime_ti = create_runtime_ti(task=task)
mock_log = mock.MagicMock()

# mock the SetRenderedFields call to fail with API_SERVER_ERROR
mock_supervisor_comms.send.side_effect = AirflowRuntimeError(
error=ErrorResponse(
error=ErrorType.API_SERVER_ERROR,
detail={
"status_code": 404,
"message": "Not Found",
"detail": {"detail": "Not Found"},
},
)
)

finalize(
runtime_ti,
state=TaskInstanceState.FAILED,
context=runtime_ti.get_template_context(),
log=mock_log,
error=Exception("Task execution failed"),
)

mock_log.exception.assert_called_once_with(
"Failed to set rendered fields during finalization",
ti=runtime_ti,
task=task,
)

@pytest.mark.parametrize(
("task_reschedule_count", "expected_date"),
[
Expand Down
Loading