Workflow scheduling
Automatically run workflows at specified times without manual triggering.
How it works
How it works
Workers automatically register workflow schedules with Workflows during startup. The system handles:
- Schedule registration with Workflows
- Periodic refresh of schedule definitions (every 10 seconds)
- Execution of workflows according to the schedule
Defining Schedules
Defining Schedules
Add schedules to workflows using cron expressions:
from mistralai.workflows.models import ScheduleDefinition
schedule = ScheduleDefinition(
input={"report_type": "daily"},
cron_expressions=["0 0 * * *"] # Daily at midnight UTC
)
@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, report_type: str = "daily") -> None:
# Generate report
passDefining Schedule Policies
Defining Schedule Policies
Policies control what happens when schedules are missed or overlap:
catchup_window_seconds— If the platform was down or missed scheduled runs, it will retroactively trigger all missed executions within this window. Runs older than the window are skipped.overlap— What to do when a new run is due but the previous one is still running.SKIPdrops the new run,BUFFER_ONEqueues one pending run, andALLOW_ALLstarts all runs concurrently.
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy
# Override default schedule policy
schedule_policy = SchedulePolicy(
catchup_window_seconds=86400, # Allow 1 day of catchup
overlap=ScheduleOverlapPolicy.SKIP, # Skip overlapping executions
)
schedule = ScheduleDefinition(
input={"report_type": "daily"},
cron_expressions=["0 0 * * *"], # Daily at midnight UTC
policy=schedule_policy
)
@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, report_type: str = "daily") -> None:
# Generate report
passKey Considerations
Key Considerations
-
Worker Configuration:
- Ensure all workers have identical schedule configurations
- Mismatched configurations can cause conflicts and unexpected behavior
-
Schedule Definition:
- Uses standard cron syntax
- Includes input parameters for scheduled executions
- Supports multiple cron expressions per workflow
Complete Example
Complete Example
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy
# Run every Saturday at 3 AM UTC
backup_schedule = ScheduleDefinition(
input={"retention_days": 30},
cron_expressions=["0 3 * * 6"],
policy=SchedulePolicy(
catchup_window_seconds=604800, # 7 days
overlap=ScheduleOverlapPolicy.SKIP,
)
)
@workflows.workflow.define(name="database_backup_workflow", schedules=[backup_schedule])
class DatabaseBackupWorkflow:
@workflows.workflow.entrypoint
async def run(self, retention_days: int = 30) -> None:
print(f"Starting backup with {retention_days} day retention")
# Backup implementation here
# Start worker with:
# asyncio.run(workflows.run_worker([DatabaseBackupWorkflow]))Important Notes
Important Notes
- Schedules use UTC time zone by default
- Each schedule can specify different input parameters
- Workers automatically maintain schedule registrations
- Ensure consistent schedule definitions across all workers