Workflow scheduling

Automatically run workflows at specified times without manual triggering.

How it works

How it works

Workers automatically register workflow schedules with Workflows during startup. The system handles:

  1. Schedule registration with Workflows
  2. Periodic refresh of schedule definitions (every 10 seconds)
  3. Execution of workflows according to the schedule
Defining Schedules

Defining Schedules

Add schedules to workflows using cron expressions:

from mistralai.workflows.models import ScheduleDefinition

schedule = ScheduleDefinition(
    input={"report_type": "daily"},
    cron_expressions=["0 0 * * *"]  # Daily at midnight UTC
)

@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, report_type: str = "daily") -> None:
        # Generate report
        pass
Defining Schedule Policies

Defining Schedule Policies

Policies control what happens when schedules are missed or overlap:

  • catchup_window_seconds — If the platform was down or missed scheduled runs, it will retroactively trigger all missed executions within this window. Runs older than the window are skipped.
  • overlap — What to do when a new run is due but the previous one is still running. SKIP drops the new run, BUFFER_ONE queues one pending run, and ALLOW_ALL starts all runs concurrently.
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy

# Override default schedule policy
schedule_policy = SchedulePolicy(
    catchup_window_seconds=86400,  # Allow 1 day of catchup
    overlap=ScheduleOverlapPolicy.SKIP,  # Skip overlapping executions
)

schedule = ScheduleDefinition(
    input={"report_type": "daily"},
    cron_expressions=["0 0 * * *"],  # Daily at midnight UTC
    policy=schedule_policy
)

@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, report_type: str = "daily") -> None:
        # Generate report
        pass
Key Considerations

Key Considerations

  1. Worker Configuration:

    • Ensure all workers have identical schedule configurations
    • Mismatched configurations can cause conflicts and unexpected behavior
  2. Schedule Definition:

    • Uses standard cron syntax
    • Includes input parameters for scheduled executions
    • Supports multiple cron expressions per workflow
Complete Example

Complete Example

import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy

# Run every Saturday at 3 AM UTC
backup_schedule = ScheduleDefinition(
    input={"retention_days": 30},
    cron_expressions=["0 3 * * 6"],
    policy=SchedulePolicy(
        catchup_window_seconds=604800,  # 7 days
        overlap=ScheduleOverlapPolicy.SKIP,
    )
)

@workflows.workflow.define(name="database_backup_workflow", schedules=[backup_schedule])
class DatabaseBackupWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, retention_days: int = 30) -> None:
        print(f"Starting backup with {retention_days} day retention")
        # Backup implementation here

# Start worker with:
# asyncio.run(workflows.run_worker([DatabaseBackupWorkflow]))
Important Notes

Important Notes

  • Schedules use UTC time zone by default
  • Each schedule can specify different input parameters
  • Workers automatically maintain schedule registrations
  • Ensure consistent schedule definitions across all workers