Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content

Ensure that DB migrations handles all kinds of NaN values in historical xcoms#57866

Merged
vatsrahul1001 merged 8 commits into
apache:mainfrom
astronomer:fix-migration-issues
Nov 5, 2025
Merged

Ensure that DB migrations handles all kinds of NaN values in historical xcoms#57866
vatsrahul1001 merged 8 commits into
apache:mainfrom
astronomer:fix-migration-issues

Conversation

@amoghrajesh

Copy link
Copy Markdown
Contributor

While upgrading from 2.x -> 3.x if the DB had NaN values (either natively or within a string), things were failing, some attempts were made to fix it: #57614 but looks like native NaN broke due to this.

Hence attempting to fix it again properly to handle all sorts of cases.

How this was tested?

  1. Run breeze with 2.11 airflow
  2. Run this DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime


def push_nan_to_xcom(**kwargs):
    # This dict contains a NaN, which is not valid JSON
    value = [{"day": "2024-06-07", "ArticleCountMetric": float("nan")}]
    kwargs["ti"].xcom_push(key="bad_json", value=value)

def number_of_people(**kwargs):
    # This dict contains a NaN, which is not valid JSON
    list_of_people_in_space = [
        {"craft": "Tiangong", "name": "Ye GuangfuNaN"},
    ]
    kwargs["ti"].xcom_push(key="people_in_space", value=list_of_people_in_space)

def long_url(**kwargs):
    # This dict contains a NaN, which is not valid JSON

    value = {"name": "advisors-ndjson-20250107151944.ndjson.gz", "mime_type": "application/gzip", "data_type": "advisors-ndjson", "md5": "1f7b41a00548bdee85a3cd02c02efbc8", "size": 770854, "created_at": "2025-01-07T14:19:45.057179Z", "url": "https://storage.googleapis.com/prod-gain-bulk-files/advisors-ndjson-20250107151944.ndjson.gz?Expires=1736309071&GoogleAccessId=prod-gain-sa%40gain-prod-414309.iam.gserviceaccount.com&Signature=bxojFAGIn%2B5R1xlSeV91XFGA1ZBSINaNxKZOVHdaezneaFxvQ9TPiTJ%2BIfdZBJhZg8bEuXGIOg5xJ7U0Gu1%2Fe5R52JhH81SzkvshxUBZGaHrKKAVauXrxjzvgJ39QpUrOiYAs4GSq4MNYu1ZvVfOO8q%2B3sdO3X6z2QXbfbwXXVoMmZP4XNuiQRJWSNbDanlLDNEZqotYA%3D%3D", "schema": "advisors", "s3_path_latest": "bulk_files/advisors/historical/dt=2025-01-07/advisors-ndjson-20250107151944.ndjson.gz"}
    kwargs["ti"].xcom_push(key="long_url", value=value)


def array_nan(**kwargs):
    # This dict contains a NaN, which is not valid JSON

    value = [float("nan")]
    kwargs["ti"].xcom_push(key="array_nan", value=value)


with DAG(
    dag_id="xcom_nan_example",
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t0 = PythonOperator(
        task_id="t1",
        python_callable=push_nan_to_xcom,
        provide_context=True,
    )
    t1 = PythonOperator(
        task_id="t2",
        python_callable=number_of_people,
        provide_context=True,
    )
    t2 = PythonOperator(
        task_id="t3",
        python_callable=long_url,
        provide_context=True,
    )
    t3 = PythonOperator(
        task_id="t4",
        python_callable=array_nan,
        provide_context=True,
    )

    [t0, t1, t2, t3]

Db entries:

1,t2,-1,people_in_space,xcom_nan_example,manual__2025-11-05T14:04:44.521434+00:00,"[{""craft"": ""Tiangong"", ""name"": ""Ye GuangfuNaN""}]",2025-11-05 14:04:46.402018 +00:00
1,t1,-1,bad_json,xcom_nan_example,manual__2025-11-05T14:04:44.521434+00:00,"[{""day"": ""2024-06-07"", ""ArticleCountMetric"": NaN}]",2025-11-05 14:04:46.402129 +00:00
1,t4,-1,array_nan,xcom_nan_example,manual__2025-11-05T14:04:44.521434+00:00,[NaN],2025-11-05 14:04:46.403036 +00:00
1,t3,-1,long_url,xcom_nan_example,manual__2025-11-05T14:04:44.521434+00:00,"{""name"": ""advisors-ndjson-20250107151944.ndjson.gz"", ""mime_type"": ""application/gzip"", ""data_type"": ""advisors-ndjson"", ""md5"": ""1f7b41a00548bdee85a3cd02c02efbc8"", ""size"": 770854, ""created_at"": ""2025-01-07T14:19:45.057179Z"", ""url"": ""https://storage.googleapis.com/prod-gain-bulk-files/advisors-ndjson-20250107151944.ndjson.gz?Expires=1736309071&GoogleAccessId=prod-gain-sa%40gain-prod-414309.iam.gserviceaccount.com&Signature=bxojFAGIn%2B5R1xlSeV91XFGA1ZBSINaNxKZOVHdaezneaFxvQ9TPiTJ%2BIfdZBJhZg8bEuXGIOg5xJ7U0Gu1%2Fe5R52JhH81SzkvshxUBZGaHrKKAVauXrxjzvgJ39QpUrOiYAs4GSq4MNYu1ZvVfOO8q%2B3sdO3X6z2QXbfbwXXVoMmZP4XNuiQRJWSNbDanlLDNEZqotYA%3D%3D"", ""schema"": ""advisors"", ""s3_path_latest"": ""bulk_files/advisors/historical/dt=2025-01-07/advisors-ndjson-20250107151944.ndjson.gz""}",2025-11-05 14:04:46.404231 +00:00

  1. stop_airflow
  2. Run airflow from main now without DB reset flag
DB: postgresql+psycopg2://postgres:***@postgres/airflow
Performing upgrade to the metadata database postgresql+psycopg2://postgres:***@postgres/airflow
/opt/airflow/airflow-core/src/airflow/utils/module_loading.py:44 DeprecatedImportWarning: Importing DagBag from airflow.models.dagbag is deprecated and will be removed in a future release. Please import from airflow.dag_processing.dagbag instead.
/opt/airflow/airflow-core/src/airflow/dag_processing/dagbag.py:40 DeprecationWarning: airflow.exceptions.AirflowDagCycleException is deprecated. Use airflow.sdk.exceptions.AirflowDagCycleException instead.
2025-11-05T14:05:49.218398Z [info     ] Context impl PostgresqlImpl.   [alembic.runtime.migration] loc=migration.py:211
2025-11-05T14:05:49.218515Z [info     ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-05T14:05:49.221563Z [info     ] Migrating the Airflow database [airflow.utils.db] loc=db.py:1131
2025-11-05T14:05:49.228722Z [info     ] Context impl PostgresqlImpl.   [alembic.runtime.migration] loc=migration.py:211
2025-11-05T14:05:49.228834Z [info     ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-05T14:05:49.267985Z [info     ] Running upgrade 22ed7efa9da2 -> 5f2621c13b39, Rename dag_schedule_dataset_alias_reference constraint names. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.276416Z [info     ] Running upgrade 5f2621c13b39 -> 044f740568ec, Drop ab_user.id foreign key. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.279476Z [info     ] Running upgrade 044f740568ec -> d0f1c55954fa, Remove SubDAGs: ``is_subdag`` & ``root_dag_id`` columns from DAG table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.282493Z [info     ] Running upgrade d0f1c55954fa -> 0bfc26bc256e, Rename DagModel schedule_interval to timetable_summary. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.284449Z [info     ] Running upgrade 0bfc26bc256e -> a2c32e6c7729, Add triggered_by field to DagRun. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.287154Z [info     ] Running upgrade a2c32e6c7729 -> 1cdc775ca98f, Rename execution_date to logical_date. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.292387Z [info     ] Running upgrade 1cdc775ca98f -> 522625f6d606, Add tables for backfill. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.307186Z [info     ] Running upgrade 522625f6d606 -> 16cbcb1c8c36, Remove redundant index. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.310421Z [info     ] Running upgrade 16cbcb1c8c36 -> 44eabb1904b4, Update dag_run_note.user_id and task_instance_note.user_id columns to String. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.320419Z [info     ] Running upgrade 44eabb1904b4 -> 0d9e73a75ee4, Add name and group fields to DatasetModel. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.333294Z [info     ] Running upgrade 0d9e73a75ee4 -> c3389cd7793f, Add backfill to dag run model. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.336457Z [info     ] Running upgrade c3389cd7793f -> 5a5d66100783, Add AssetActive to track orphaning instead of a flag. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.342600Z [info     ] Running upgrade 5a5d66100783 -> fb2d4922cd79, Tweak AssetAliasModel to match AssetModel after AIP-76. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.348313Z [info     ] Running upgrade fb2d4922cd79 -> 3a8972ecb8f9, Add exception_reason and logical_date to BackfillDagRun. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.349960Z [info     ] Running upgrade 3a8972ecb8f9 -> 05234396c6fc, Rename dataset as asset. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.392131Z [info     ] Running upgrade 05234396c6fc -> d59cbbef95eb, Add UUID primary key to ``task_instance`` table. [alembic.runtime.migration] loc=migration.py:622
processing batch 1
Migrated 4 task_instance rows in this batch...
processing batch 2
2025-11-05T14:05:49.409086Z [info     ] Running upgrade d59cbbef95eb -> 486ac7936b78, remove scheduler_lock column. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.410586Z [info     ] Running upgrade 486ac7936b78 -> 5f57a45b8433, Drop task_fail table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.412300Z [info     ] Running upgrade 5f57a45b8433 -> d8cd3297971e, Add last_heartbeat_at directly to TI. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.414971Z [info     ] Running upgrade d8cd3297971e -> d03e4a635aa3, Drop DAG pickling. [alembic.runtime.migration] loc=migration.py:622
──0: ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬──1: Triggerer───────────────────────────────────────────────────────────────────────────────────────────────────────────
Welcome to your tmux based running Airflow environment (courtesy of Breeze).                                               │job_runner.py:735
                                                                                                                           │2025-11-05T14:07:01.737362Z [info     ] 0 watchers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_
     To stop Airflow and exit tmux, just type 'stop_airflow'.                                                              │job_runner.py:735
                                                                                                                           │2025-11-05T14:07:34.409813Z [info     ] Triggerer's async thread was blocked for 0.24 seconds, likely by a badly-written
     If you want to build webserver assets dynamically, run start-airflow command with --dev-mode                          │trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines. [airflow.jobs.triggerer_job_runner]
                                                                                                                           │loc=triggerer_job_runner.py:735
root@534c33b7a675:/opt/airflow# stop_airflow                                                                               │2025-11-05T14:08:01.993553Z [info     ] 0 triggers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_
                                                                                                                           │job_runner.py:735
                                                                                                                           │2025-11-05T14:08:01.993820Z [info     ] 0 watchers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_
                                                                                                                           │job_runner.py:735
                                                                                                                           │2025-11-05T14:09:02.301305Z [info     ] 0 triggers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_
                                                                                                                           │job_runner.py:735
                                                                                                                           │2025-11-05T14:09:02.304109Z [info     ] 0 watchers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_
                                                                                                                           │job_runner.py:735
                                                                                                                           │2025-11-05T14:10:02.577264Z [info     ] 0 triggers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_
                                                                                                                           │job_runner.py:735
                                                                                                                           │2025-11-05T14:10:02.578791Z [info     ] 0 watchers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_
                                                                                                                           │job_runner.py:735
                                                                                                                           │2025-11-05T14:11:02.865603Z [info     ] 0 triggers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_
                                                                                                                           │job_runner.py:735
                                                                                                                           │2025-11-05T14:11:02.865900Z [info     ] 0 watchers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_
                                                                                                                           │job_runner.py:735
                                                                                                                           │^C
                                                                                                                           │exit
                                                                                                                           │2025-11-05T14:11:05.659518Z [info     ] Waiting for triggers to clean up [airflow.jobs.triggerer_job_runner.TriggererJobR
                                                                                                                           │unner] loc=triggerer_job_runner.py:176

──2: Scheduler─────────────────────────────────────────────────────────────────────────────────┬──3: API Server────────────┴──────────────────────────────────────────────┬──4: DAG Processor────────────────────────────────────────────────────────
2025-11-05T14:06:01.332277Z [info     ] Starting the scheduler         [airflow.jobs.scheduler_│ _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/                          │dags-folder  hitl.py
job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1066                                │2025-11-05T14:06:00.994195Z [warning  ] Failed to convert value. Please ch│          1           0  0.03s            2025-11-05T14:10:36
2025-11-05T14:06:01.336092Z [info     ] Loaded executor: :LocalExecutor: [airflow.executors.exe│eck memray_trace_components key in profiling section. it must be one of sc│dags-folder  repro_trigger_crash.py
cutor_loader] loc=executor_loader.py:349                                                       │heduler, dag_processor, api, if not the value is ignored [airflow.configur│          1           0  0.03s            2025-11-05T14:10:38
2025-11-05T14:06:01.342570Z [info     ] Adopting or resetting orphaned tasks for active dag run│ation] loc=configuration.py:1283                                          │dags-folder  notifier-dag.py
s [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:2314      │2025-11-05T14:06:00.994317Z [info     ] Running the uvicorn with:         │          1           0  0.03s            2025-11-05T14:10:38
2025-11-05T14:06:01.346752Z [info     ] Marked 1 SchedulerJob instances as failed [airflow.jobs│Apps: all                                                                 │dags-folder  logurl.py
.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:2338                     │Workers: 1                                                                │          1           0  0.03s            2025-11-05T14:10:36
WARNING:  ASGI app factory detected. Using it, but please consider setting the --factory flag e│Host: 0.0.0.0:8080                                                        │dags-folder  pierre.py
xplicitly.                                                                                     │Timeout: 120                                                              │          0           1  0.03s            2025-11-05T14:10:42
INFO:     Started server process [152]                                                         │Logfiles: -                                                               │dags-folder  cleanup-xcom.py
INFO:     Waiting for application startup.                                                     │================================================================= [airflow│          0           1  0.03s            2025-11-05T14:10:38
INFO:     Application startup complete.                                                        │.cli.commands.api_server_command] loc=api_server_command.py:55            │dags-folder  smtp_email.py
INFO:     Uvicorn running on http://:8793 (Press CTRL+C to quit)                               │/opt/airflow/airflow-core/src/airflow/api_fastapi/execution_api/routes/__i│          0           1  0.03s            2025-11-05T14:10:36
2025-11-05T14:11:01.391331Z [info     ] Adopting or resetting orphaned tasks for active dag run│nit__.py:23 DeprecationWarning: 'HTTP_422_UNPROCESSABLE_ENTITY' is depreca│dags-folder  with-callbacks.py
s [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:2314      │ted. Use 'HTTP_422_UNPROCESSABLE_CONTENT' instead.                        │          1           0  0.03s            2025-11-05T14:10:38
^C                                                                                             │__file__='/files/plugins/triggera.py' loaded                              │dags-folder  with-fail-fast.py
exit                                                                                           │__file__='/files/plugins/triggera_comprehensive.py' loaded                │          1           0  0.03s            2025-11-05T14:10:41
2025-11-05T14:11:05.649268Z [info     ] Exiting gracefully upon receiving signal 2 [airflow.job│/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/_│dags-folder  rendered.py
s.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:287                     │_init__.py:36 DeprecationWarning: 'HTTP_422_UNPROCESSABLE_ENTITY' is depre│          1           0  0.03s            2025-11-05T14:10:42
2025-11-05T14:11:05.649672Z [info     ] Shutting down LocalExecutor; waiting for running tasks │cated. Use 'HTTP_422_UNPROCESSABLE_CONTENT' instead.                      │dags-folder  xcom_push_pull.py
to finish.  Signal again if you don't want to wait. [airflow.executors.local_executor.LocalExec│INFO:     Started server process [127]                                    │          0           1  0.03s            2025-11-05T14:10:36
utor] loc=local_executor.py:221                                                                │INFO:     Waiting for application startup.                                │==========================================================================
2025-11-05T14:11:05.650114Z [info     ] Exited execute loop            [airflow.jobs.scheduler_│INFO:     Application startup complete.                                   │====== [airflow.dag_processing.manager.DagFileProcessorManager] loc=manage
job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1107                                │INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)   │r.py:785
                                                                                               │                                                                          │
[Airflow] 0:Main*                                                                                                                                                                                                      "534c33b7a675" 14:11 05-Nov-25
2025-11-05T14:05:49.416836Z [info     ] Running upgrade d03e4a635aa3 -> 2b47dc6bc8df, add dag versioning. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.429959Z [info     ] Running upgrade 2b47dc6bc8df -> 9fc3fc5de720, Add references between assets and triggers. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.435273Z [info     ] Running upgrade 9fc3fc5de720 -> eed27faa34e3, Remove pickled data from xcom table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.445038Z [info     ] Running upgrade eed27faa34e3 -> e229247a6cb1, Add DagBundleModel. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.451036Z [info     ] Running upgrade e229247a6cb1 -> 038dc8bc6284, update trigger_timeout column in task_instance table to UTC. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.452665Z [info     ] Running upgrade 038dc8bc6284 -> 237cef8dfea1, Add deadline alerts table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.457857Z [info     ] Running upgrade 237cef8dfea1 -> 5c9c0231baa2, Remove processor_subdir. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.459407Z [info     ] Running upgrade 5c9c0231baa2 -> 38770795785f, Add asset reference models. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.468874Z [info     ] Running upgrade 38770795785f -> e39a26ac59f6, remove pickled data from dagrun table. [alembic.runtime.migration] loc=migration.py:622
converting dag run conf. batch=1
2025-11-05T14:05:49.471116Z [info     ] Running upgrade e39a26ac59f6 -> 8ea135928435, Add relative fileloc column. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.472322Z [info     ] Running upgrade 8ea135928435 -> 33b04e4bfa19, add new task_instance field scheduled_dttm. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.473694Z [info     ] Running upgrade 33b04e4bfa19 -> 6a9e7a527a88, Add DagRun run_after. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.476119Z [info     ] Running upgrade 6a9e7a527a88 -> e00344393f31, remove external_trigger field. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.477213Z [info     ] Running upgrade e00344393f31 -> 7645189f3479, Add try_id to TI and TIH. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.480559Z [info     ] Running upgrade 7645189f3479 -> cf87489a35df, Use TI.id as primary key to TaskInstanceNote. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.484610Z [info     ] Running upgrade cf87489a35df -> 16f7f5ee874e, Remove dag.default_view column. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.485762Z [info     ] Running upgrade 16f7f5ee874e -> d469d27e2a64, Use ti_id as FK to TaskReschedule. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.488950Z [info     ] Running upgrade d469d27e2a64 -> be2cc2f742cf, Support bundles in DagPriorityParsingRequest. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.490533Z [info     ] Running upgrade be2cc2f742cf -> ec62e120484d, Add new otel span fields. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.493095Z [info     ] Running upgrade ec62e120484d -> 0e9519b56710, Rename run_type from 'dataset_triggered' to 'asset_triggered' in dag_run table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.494554Z [info     ] Running upgrade 0e9519b56710 -> 959e216a3abb, Rename ``is_active`` to ``is_stale`` column in ``dag`` table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.496065Z [info     ] Running upgrade 959e216a3abb -> 29ce7909c52b, Change TI table to have unique UUID id/pk per attempt. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.499575Z [info     ] Running upgrade 29ce7909c52b -> fe199e1abd77, Delete import errors. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.500646Z [info     ] Running upgrade fe199e1abd77 -> dfee8bd5d574, Add Deadline to Dag. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.501820Z [info     ] Running upgrade dfee8bd5d574 -> 0242ac120002, Rename Deadline column in the Deadline table from deadline to deadline_time and change its type from DateTime to UTC DateTime. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.546878Z [info     ] Running upgrade 0242ac120002 -> 3ac9e5732b1f, Change the on-delete behaviour of task_instance.dag_version_id foreign key constraint to RESTRICT. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.549359Z [info     ] Running upgrade 3ac9e5732b1f -> 583e80dfcef4, Add task_inlet_asset_reference table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.555508Z [info     ] Running upgrade 583e80dfcef4 -> 66a7743fe20e, Add triggering user to dag_run. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.557476Z [info     ] Running upgrade 66a7743fe20e -> ffdb0566c7c0, Add dag_favorite table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.560686Z [info     ] Running upgrade ffdb0566c7c0 -> 40f7c30a228b, Add Human In the Loop Detail table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.564873Z [info     ] Running upgrade 40f7c30a228b -> 09fa89ba1710, Add trigger_id to deadline. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.566621Z [info     ] Running upgrade 09fa89ba1710 -> f56f68b9e02f, Add callback_state to deadline. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.569145Z [info     ] Running upgrade f56f68b9e02f -> 3bda03debd04, Add url template and template params to DagBundleModel. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.571414Z [info     ] Running upgrade 3bda03debd04 -> 808787349f22, Modify deadline's callback schema. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.572739Z [info     ] Running upgrade 808787349f22 -> a169942745c2, Remove dag_id from Deadline. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.574039Z [info     ] Running upgrade a169942745c2 -> 7582ea3f3dd5, Make bundle_name not nullable. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.576653Z [info     ] Running upgrade 7582ea3f3dd5 -> a3c7f2b18d4e, Add tables to store teams and associations with dag bundles. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.610487Z [info     ] Running upgrade a3c7f2b18d4e -> eaf332f43c7c, add last_parse_duration to dag model. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.612116Z [info     ] Running upgrade eaf332f43c7c -> cc92b33c6709, Add backward compatibility for serialized DAG format v3 to v2. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.613187Z [info     ] Running upgrade cc92b33c6709 -> 15d84ca19038, replace asset_trigger table with asset_watcher. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.618617Z [info     ] Running upgrade 15d84ca19038 -> ab6dc0c82d0e, Change ``serialized_dag`` data column to JSONB for PostgreSQL. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.623126Z [info     ] Running upgrade ab6dc0c82d0e -> 1b2c3d4e5f6g, Add length to dag_bundle_team.dag_bundle_name. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.624950Z [info     ] Running upgrade 1b2c3d4e5f6g -> 5cc8117e9285, Add Human In the Loop Detail History table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.629188Z [info     ] Running upgrade 5cc8117e9285 -> 69ddce9a7247, Add ``fail_fast`` column to dag table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.630689Z [info     ] Running upgrade 69ddce9a7247 -> b87d2135fa50, Restructure callback table. [alembic.runtime.migration] loc=migration.py:622
2025-11-05T14:05:49.636113Z [info     ] Context impl PostgresqlImpl.   [alembic.runtime.migration] loc=migration.py:211
2025-11-05T14:05:49.636221Z [info     ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
Database migrating done!
Skipping user creation as auth manager different from Fab is used

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works for all my test cases, just not sure if this is the best way forward. Assisted a lot by cursor

@amoghrajesh amoghrajesh self-assigned this Nov 5, 2025
@amoghrajesh

Copy link
Copy Markdown
Contributor Author

Alright, with head at: 0587a26, I tried both postgres and mysql, and it works fine for both of them with the same testing instructions as in the PR desc with the DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime


def push_nan_to_xcom(**kwargs):
    # This dict contains a NaN, which is not valid JSON
    value = [{"day": "2024-06-07", "ArticleCountMetric": float("nan")}]
    kwargs["ti"].xcom_push(key="bad_json", value=value)

def number_of_people(**kwargs):
    # This dict contains a NaN, which is not valid JSON
    list_of_people_in_space = [
        {"craft": "Tiangong", "name": "Ye GuangfuNaN"},
    ]
    kwargs["ti"].xcom_push(key="people_in_space", value=list_of_people_in_space)

def long_url(**kwargs):
    # This dict contains a NaN, which is not valid JSON

    value = {"name": "advisors-ndjson-20250107151944.ndjson.gz", "mime_type": "application/gzip", "data_type": "advisors-ndjson", "md5": "1f7b41a00548bdee85a3cd02c02efbc8", "size": 770854, "created_at": "2025-01-07T14:19:45.057179Z", "url": "https://storage.googleapis.com/prod-gain-bulk-files/advisors-ndjson-20250107151944.ndjson.gz?Expires=1736309071&GoogleAccessId=prod-gain-sa%40gain-prod-414309.iam.gserviceaccount.com&Signature=bxojFAGIn%2B5R1xlSeV91XFGA1ZBSINaNxKZOVHdaezneaFxvQ9TPiTJ%2BIfdZBJhZg8bEuXGIOg5xJ7U0Gu1%2Fe5R52JhH81SzkvshxUBZGaHrKKAVauXrxjzvgJ39QpUrOiYAs4GSq4MNYu1ZvVfOO8q%2B3sdO3X6z2QXbfbwXXVoMmZP4XNuiQRJWSNbDanlLDNEZqotYA%3D%3D", "schema": "advisors", "s3_path_latest": "bulk_files/advisors/historical/dt=2025-01-07/advisors-ndjson-20250107151944.ndjson.gz"}
    kwargs["ti"].xcom_push(key="long_url", value=value)


def array_nan(**kwargs):
    # This dict contains a NaN, which is not valid JSON

    value = [float("nan")]
    kwargs["ti"].xcom_push(key="array_nan", value=value)


with DAG(
    dag_id="xcom_nan_example",
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t0 = PythonOperator(
        task_id="t1",
        python_callable=push_nan_to_xcom,
        provide_context=True,
    )
    t1 = PythonOperator(
        task_id="t2",
        python_callable=number_of_people,
        provide_context=True,
    )
    t2 = PythonOperator(
        task_id="t3",
        python_callable=long_url,
        provide_context=True,
    )
    t3 = PythonOperator(
        task_id="t4",
        python_callable=array_nan,
        provide_context=True,
    )

    [t0, t1, t2, t3]

And for 2.10.0 -> main (3.2.0)

@vatsrahul1001

vatsrahul1001 commented Nov 5, 2025

Copy link
Copy Markdown
Contributor

Tested this looks good to me.

@vatsrahul1001 vatsrahul1001 requested a review from ashb November 5, 2025 17:59
@vatsrahul1001 vatsrahul1001 added this to the Airflow 3.1.3 milestone Nov 5, 2025
@amoghrajesh amoghrajesh changed the title Make sure DB migration handles all kinds of NaN values in xcom Ensure that DB migrations handles all kinds of NaN values in historical xcoms Nov 5, 2025
@vatsrahul1001 vatsrahul1001 merged commit 5168e62 into apache:main Nov 5, 2025
63 checks passed
@vatsrahul1001 vatsrahul1001 deleted the fix-migration-issues branch November 5, 2025 19:24
@github-actions

github-actions Bot commented Nov 5, 2025

Copy link
Copy Markdown
Contributor

Backport failed to create: v3-1-test. View the failure log Run details

Status Branch Result
v3-1-test Commit Link

You can attempt to backport this manually by running:

cherry_picker 5168e62 v3-1-test

This should apply the commit to the v3-1-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

amoghrajesh added a commit to amoghrajesh/airflow that referenced this pull request Nov 5, 2025
…al xcoms (apache#57866)

Ensure that DB migrations handles all kinds of NaN values in historical xcoms

(cherry picked from commit 5168e62)
@amoghrajesh

Copy link
Copy Markdown
Contributor Author

Manual cherry pick: #57893

vatsrahul1001 pushed a commit that referenced this pull request Nov 5, 2025
…al xcoms (#57866) (#57893)

Ensure that DB migrations handles all kinds of NaN values in historical xcoms

(cherry picked from commit 5168e62)
xchwan pushed a commit to xchwan/airflow that referenced this pull request Nov 6, 2025
…al xcoms (apache#57866)

Ensure that DB migrations handles all kinds of NaN values in historical xcoms
@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Nov 10, 2025
ephraimbuddy pushed a commit that referenced this pull request Nov 10, 2025
…al xcoms (#57866) (#57893)

Ensure that DB migrations handles all kinds of NaN values in historical xcoms

(cherry picked from commit 5168e62)
Copilot AI pushed a commit to jason810496/airflow that referenced this pull request Dec 5, 2025
…al xcoms (apache#57866)

Ensure that DB migrations handles all kinds of NaN values in historical xcoms
jedcunningham added a commit to astronomer/airflow that referenced this pull request Mar 2, 2026
XCom values containing float('nan'), float('inf'), or float('-inf')
caused the database migration to silently corrupt data or fail
outright when upgrading. Three bugs were present across backends:

- Consecutive tokens (e.g. [NaN, NaN]) were only partially replaced,
  leaving bare NaN/Infinity in the output and breaking the JSON cast.
- Infinity and -Infinity were not handled at all — only NaN was.
- Bare top-level values (a single NaN or Infinity, not inside a list
  or dict) were not matched and passed through unconverted.

MySQL also had two bugs in the replacement query that caused it to produce
the wrong output (one of these was pre-existing from apache#57866).
vatsrahul1001 added a commit that referenced this pull request Mar 3, 2026
XCom values containing float('nan'), float('inf'), or float('-inf')
caused the database migration to silently corrupt data or fail
outright when upgrading. Three bugs were present across backends:

- Consecutive tokens (e.g. [NaN, NaN]) were only partially replaced,
  leaving bare NaN/Infinity in the output and breaking the JSON cast.
- Infinity and -Infinity were not handled at all — only NaN was.
- Bare top-level values (a single NaN or Infinity, not inside a list
  or dict) were not matched and passed through unconverted.

MySQL also had two bugs in the replacement query that caused it to produce
the wrong output (one of these was pre-existing from #57866).

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
jedcunningham added a commit to astronomer/airflow that referenced this pull request Mar 3, 2026
XCom values containing float('nan'), float('inf'), or float('-inf')
caused the database migration to silently corrupt data or fail
outright when upgrading. Three bugs were present across backends:

- Consecutive tokens (e.g. [NaN, NaN]) were only partially replaced,
  leaving bare NaN/Infinity in the output and breaking the JSON cast.
- Infinity and -Infinity were not handled at all — only NaN was.
- Bare top-level values (a single NaN or Infinity, not inside a list
  or dict) were not matched and passed through unconverted.

MySQL also had two bugs in the replacement query that caused it to produce
the wrong output (one of these was pre-existing from apache#57866).

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
(cherry picked from commit 7a301e6)
jedcunningham added a commit that referenced this pull request Mar 3, 2026
…2760)

XCom values containing float('nan'), float('inf'), or float('-inf')
caused the database migration to silently corrupt data or fail
outright when upgrading. Three bugs were present across backends:

- Consecutive tokens (e.g. [NaN, NaN]) were only partially replaced,
  leaving bare NaN/Infinity in the output and breaking the JSON cast.
- Infinity and -Infinity were not handled at all — only NaN was.
- Bare top-level values (a single NaN or Infinity, not inside a list
  or dict) were not matched and passed through unconverted.

MySQL also had two bugs in the replacement query that caused it to produce
the wrong output (one of these was pre-existing from #57866).


(cherry picked from commit 7a301e6)

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
vatsrahul1001 added a commit that referenced this pull request Mar 4, 2026
…2760)

XCom values containing float('nan'), float('inf'), or float('-inf')
caused the database migration to silently corrupt data or fail
outright when upgrading. Three bugs were present across backends:

- Consecutive tokens (e.g. [NaN, NaN]) were only partially replaced,
  leaving bare NaN/Infinity in the output and breaking the JSON cast.
- Infinity and -Infinity were not handled at all — only NaN was.
- Bare top-level values (a single NaN or Infinity, not inside a list
  or dict) were not matched and passed through unconverted.

MySQL also had two bugs in the replacement query that caused it to produce
the wrong output (one of these was pre-existing from #57866).


(cherry picked from commit 7a301e6)

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
dominikhei pushed a commit to dominikhei/airflow that referenced this pull request Mar 11, 2026
XCom values containing float('nan'), float('inf'), or float('-inf')
caused the database migration to silently corrupt data or fail
outright when upgrading. Three bugs were present across backends:

- Consecutive tokens (e.g. [NaN, NaN]) were only partially replaced,
  leaving bare NaN/Infinity in the output and breaking the JSON cast.
- Infinity and -Infinity were not handled at all — only NaN was.
- Bare top-level values (a single NaN or Infinity, not inside a list
  or dict) were not matched and passed through unconverted.

MySQL also had two bugs in the replacement query that caused it to produce
the wrong output (one of these was pre-existing from apache#57866).

Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:db-migrations PRs with DB migration kind:documentation type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants