Skip to content

Index

Datatailr Scheduler Module

The datatailr.scheduler module provides a framework for scheduling and managing batch jobs.

The main job types are:


  • Batch: Represents a batch job that can be scheduled and executed. The job can include multiple tasks which can be run in parallel or sequentially.
  • Service: Represents a service job that runs continuously.
  • App: Represents a web app or a dashboard, which can be built using one of the supported frameworks, such as Streamlit <https://streamlit.io/>, Dash <https://dash.plotly.com/>, or Panel <https://panel.holoviz.org/>_.
  • Excel: Represents an Excel add-in.

App

Deploy a web application or dashboard to Datatailr.

This supports Streamlit, Dash, or any framework that can be started via a standard entrypoint (for example, streamlit run app.py).

Parameters:

  • name

    (str) –

    App display name.

  • entrypoint

    (Callable | str) –

    Callable that starts the app server.

  • environment

    (Environment | None, default: DEV ) –

    Target environment for the job.

  • image

    (Image | None, default: None ) –

    Container image to use for execution.

  • run_as

    (str | User | None, default: None ) –

    User to execute the job as.

  • resources

    (Resources, default: Resources() ) –

    Resource requirements for the job.

  • acl

    (ACL | None, default: None ) –

    Access control settings for the job.

  • framework

    (str | None, default: None ) –

    Framework to use for the app.

  • python_version

    (str, default: '3.12' ) –

    Python version to use.

  • python_requirements

    (str | List[str], default: '' ) –

    Requirements to install (string or list).

  • build_script_pre

    (str, default: '' ) –

    Shell script to run before build.

  • build_script_post

    (str, default: '' ) –

    Shell script to run after build.

  • env_vars

    (dict[str, str | int | float | bool] | None, default: None ) –

    Environment variables to set.

  • get_existing

    (bool, default: False ) –

    Update an existing job with the same name.

Examples:

Minimal Streamlit app.

# app.py
import streamlit as st

def main():
    st.title("Hello Datatailr App")

if __name__ == "__main__":
    main()

Test locally.

streamlit run app.py

Deploy to Datatailr.

from app import main
from datatailr import App

app = App(
    name="Simple Dashboard App",
    entrypoint=main,
    python_requirements="streamlit")

app.run()

Parameters:

  • name

    (str) –

    Display name for the app.

  • entrypoint

    (Callable | str) –

    The callable (function) that starts the application.

  • environment

    (Environment | None, default: DEV ) –

    Target environment for the deployment.

  • image

    (Image | None, default: None ) –

    Pre-configured container Image. When None, an image is built from python_version, python_requirements, and the build scripts.

  • run_as

    (str | User | None, default: None ) –

    User or username to run the app as. Defaults to the currently signed-in user.

  • resources

    (Resources, default: Resources() ) –

    CPU and memory resources for the container.

  • acl

    (ACL | None, default: None ) –

    Access control list. Defaults to standard permissions for the current user.

  • framework

    (str | None, default: None ) –

    Optional framework name (e.g., 'streamlit', 'dash', 'flask'). If not provided, the framework is inferred from the entrypoint.

  • python_version

    (str, default: '3.12' ) –

    Python version for the container image.

  • python_requirements

    (str | List[str], default: '' ) –

    Python dependencies (see Image).

  • build_script_pre

    (str, default: '' ) –

    Dockerfile commands to run before installing requirements.

  • build_script_post

    (str, default: '' ) –

    Dockerfile commands to run after installing requirements.

  • env_vars

    (dict[str, str | int | float | bool] | None, default: None ) –

    Environment variables passed to the running container.

  • get_existing

    (bool, default: False ) –

    If True, load and update an existing job definition with the same name instead of creating a new one.

  • app_section

    (str, default: '' ) –

    The section to which the app belongs. If not provided, the app will be assigned to the default section. This affects the app launcher page.

  • budget

    (Budget | str | None, default: None ) –

    Optional spend budget as Budget("name") or a name str. Will be set to the 'default' budget if not provided.

DuplicateJobNameError

Exception raised when a job with a duplicate name is added to the batch.

EntryPoint

Represents an entry point for a Datatailr job. This can be a function or a callable object.

Environment

Deployment stage for Datatailr compute processes.

Each environment is a fully isolated stage in the development and release lifecycle. Workflows, apps, services, and other jobs run in Docker containers scoped to the selected environment, so changes in one stage do not affect the others.

New jobs default to DEV. After validation, promote them through PRE (staging) to PROD with Job.promote().

When a job runs inside the platform, the active environment is read from the DATATAILR_JOB_ENVIRONMENT variable (see get_dt_env()).

Attributes:

  • DEV

    Development environment for building and testing changes.

  • PRE

    Pre-production (staging) environment for final validation.

  • PROD

    Production environment for live workloads.

Examples:

>>> from datatailr import Environment
>>> from datatailr.scheduler import App
>>> app = App(name="my_app", environment=Environment.DEV)
>>> app.promote(from_environment=Environment.DEV)

ExcelAddin

Represents an Excel add-in deployment on Datatailr.

An Excel add-in exposes Python functions as Excel worksheet functions, allowing users to call server-side computations directly from Excel spreadsheets.

Example
from datatailr import ExcelAddin
from datatailr.excel import Addin

addin_def = Addin("Options Pricer", "Option pricing functions")

@addin_def.expose(description="Black-Scholes price")
def price_option(spot, strike, vol, rate, expiry):
    ...

def __excel_main__(port=8080, ws_port=8000):
    addin_def.run(port, ws_port)

addin = ExcelAddin(
    name="Options Pricer",
    entrypoint=__excel_main__,
    python_requirements=["numpy", "scipy"],
)
addin.run()

Parameters:

  • name

    (str) –

    Display name for the add-in.

  • entrypoint

    (Callable) –

    The callable (function) that starts the add-in server.

  • environment

    (Environment | None, default: DEV ) –

    Target environment for the deployment.

  • image

    (Image | None, default: None ) –

    Pre-configured container Image.

  • run_as

    (str | User | None, default: None ) –

    User or username to run the add-in as.

  • resources

    (Resources, default: Resources() ) –

    CPU and memory resources for the container.

  • acl

    (ACL | None, default: None ) –

    Access control list.

  • python_version

    (str, default: '3.12' ) –

    Python version for the container image.

  • python_requirements

    (str | List[str], default: '' ) –

    Python dependencies (see Image).

  • build_script_pre

    (str, default: '' ) –

    Dockerfile commands to run before installing requirements.

  • build_script_post

    (str, default: '' ) –

    Dockerfile commands to run after installing requirements.

  • env_vars

    (dict[str, str | int | float | bool] | None, default: None ) –

    Environment variables passed to the running container.

  • get_existing

    (bool, default: False ) –

    If True, update an existing job definition.

  • version

    (str | int | None, default: None ) –

    The version of the job to get.

  • app_section

    (str, default: '' ) –

    The section to which the app belongs. If not provided, the app will be assigned to the default section. This affects the app launcher page.

  • budget

    (Budget | str | None, default: None ) –

    Optional spend budget as Budget("name") or a name str. Will be set to the 'default' budget if not provided.

Job

Base class for all Datatailr compute processes (App, Service, ExcelAddin, and workflows).

A Job encapsulates the container image configuration, resource requirements, access control, and deployment metadata needed to schedule and run work on the Datatailr platform. In most cases you should use the concrete subclasses (App, Service, ExcelAddin) or the @workflow decorator rather than instantiating Job directly.

Parameters:

  • name

    (str) –

    Display name for the job.

  • environment

    (Environment | None, default: DEV ) –

    Target environment (dev / pre / prod).

  • image

    (Image | None, default: None ) –

    Pre-configured container Image. When None, an image is built from python_version, python_requirements, and the build scripts.

  • run_as

    (str | User | None, default: None ) –

    User or username under which the job runs. Defaults to the currently signed-in user.

  • resources

    (Resources | None, default: None ) –

    CPU and memory resources for the container.

  • acl

    (ACL | None, default: None ) –

    Access control list. Defaults to standard permissions for the current user.

  • python_version

    (str, default: 'auto' ) –

    Python version for the container image.

  • python_requirements

    (str | List[str], default: '' ) –

    Python dependencies (see Image).

  • build_script_pre

    (str, default: '' ) –

    Dockerfile commands before pip install.

  • build_script_post

    (str, default: '' ) –

    Dockerfile commands after pip install.

  • env_vars

    (Dict[str, str | int | float | bool] | None, default: None ) –

    Environment variables passed to the container.

  • type

    (JobType | None, default: UNKNOWN ) –

    The kind of job (workflow, service, app, excel).

  • entrypoint

    (EntryPoint | None, default: None ) –

    The entry point that starts the job.

  • get_existing

    (bool, default: False ) –

    If True, load an existing job with the same name and update it instead of creating a new definition.

  • app_section

    (str, default: '' ) –

    The section to which the app belongs. If not provided, the app will be assigned to the default section. This affects the app launcher page.

  • budget

    (Budget | str | None, default: None ) –

    Optional spend budget as Budget("name") (preferred) or a budget name str (same resolution). Serialized to JSON as budget_id (see internal-docs/budgets.md). If not provided, will be set to the 'default' budget.

budget property

Job budget. If existing definition was loaded and the current user doesn't have permissions for job's budget, this will return None

budget_id property

Budget id when set explicitly; None if omitted (platform default).

id property

Unique identifier for the job.

__enrich_task_runs__

Attach version-specific definition data (name, dependencies, resources) to each task run.

We can't rely on the currently-loaded workflow definition because the workflow might have been modified since the run completed, so the graph structure, task ids and resource sizing could all be stale. Instead we fetch the historical definition that produced this run and use it as the source of truth.

When the historical definition isn't available (e.g. it was pruned from the platform) we fall back to the current workflow's name map so the run details remain at least partially usable.

Tasks that are part of the workflow definition but never produced a run (e.g. they were skipped because an upstream task failed, or the run was a partial rerun) are surfaced as synthetic did_not_run task entries so the run details reflect the full workflow graph rather than only the tasks that executed.

__find_missing_python_packages

Compare the imports detected in the entry point code with the packages listed in python_requirements. Raise a warning if there are any imported packages that are not listed in python_requirements.

__get_existing__ classmethod

Retrieve an existing job instance from the Datatailr platform. Based on the job name and environment. If version is specified, it will retrieve the job with the specified version. Otherwise, it will retrieve the latest version of the job.

__resolve_task_id__

Resolve a workflow task id from either an explicit task_id or a task_name.

task_id takes precedence: when provided we use it directly without consulting the currently-loaded workflow definition. This matters for runs that come from older workflow versions whose graph (and therefore task names / ids) may have diverged from the current definition.

Returns None when the job is not a workflow (in which case the callers should address the job itself rather than a child task). Raises ValueError when task_name was given but no task with that name exists in the currently-loaded workflow.

__run_command__

Run a command in the context of the job. This is used to execute the job's entry point.

from_dict

Populate this Job instance from a dictionary (e.g. a platform API response).

Parameters:

  • job_dict

    (dict) –

    A dictionary containing the serialized job data.

get_schedule_args

Returns additional arguments for scheduling the job. Override or extend this method as needed.

logs

Fetch the captured stdout (or stderr) of a task within a run.

Parameters:

  • run_id

    (int) –

    The id of the run to read logs from.

  • task_name

    (str | None, default: None ) –

    For workflows, the display name of the task whose logs to fetch. For non-workflow jobs (App, Service, ExcelAddin, …) task_name is informational only — the job itself is addressed.

  • stderr

    (bool, default: False ) –

    If True, return the stderr stream instead of stdout.

  • lines

    (int, default: 5000 ) –

    Maximum number of trailing lines to return.

  • task_id

    (int | None, default: None ) –

    For workflows, the numeric task id (as exposed by run_details(...)['tasks'][i]['id']). When provided it takes precedence over task_name and is used as-is, which is the right choice for runs of older workflow versions whose graph may differ from the current one.

Returns:

  • str

    The captured log output as a string. May be empty when no log

  • str

    output is available for that run.

ls classmethod

List all jobs of this type.

Parameters:

  • environment

    (Environment | str | None, default: None ) –

    The environment to list jobs from. If not provided, it will list jobs from all environments.

  • filter_by

    (str | None, default: None ) –

    A filter expression to apply to the jobs. If not provided, it will list all jobs of this type.

Returns:

  • list[dict]

    A list of job dictionaries.

Example

from datatailr import App App.ls(environment="dev") # doctest: +SKIP [{..."environment":"dev",..."name":"test-app",..."type":"app",...}]

from datatailr import Service Service.ls(environment="dev") # doctest: +SKIP [{..."environment":"dev",..."name":"test-service",..."type":"service",...}]

promote

Promote the job to the next environment. This method is used to promote a version of the job from one environment to the next one. If none of the environments to promote from are specified, it defaults to promote from all environments.

:param from_environment: The environment to promote from. If None, it will promote from all environments. :param version: The version to promote. If None, it will promote the latest version. :return: A tuple of (success: bool, message: str).

from datatailr import Environment from datatailr.scheduler import Job job = Job(name="my_job", environment=Environment.DEV) job.promote(from_environment=Environment.DEV, version=3)

This will promote version 3 of the job from the DEV environment to the next environment (PRE).

result

Fetch the cached return value of a task within a run.

Parameters:

  • run_id

    (int | str) –

    The id of the run to read the result from.

  • task_name

    (str | None, default: None ) –

    For workflows, the display name of the task whose result to load. For non-workflow jobs task_name is informational only.

  • task_id

    (int | None, default: None ) –

    For workflows, the numeric task id (as exposed by run_details(...)['tasks'][i]['id']). When provided it takes precedence over task_name and is used as-is, which is the right choice for runs of older workflow versions whose graph may differ from the current one.

Returns:

  • Any ( Any ) –

    The cached Python object returned by the task, or None if the task is still running, did not return anything, or its arguments cache entry has expired.

run

Run the job. This is equivalent to running job.save() and then job.start().

run_details

Get the details of a run.

Each task entry is enriched with workflow-definition data taken from the workflow version that actually produced this run (job_version). This means that historical runs surface the graph structure they were executed with, even if the workflow has since been redefined.

The returned tasks list covers every task in the workflow definition, not just the ones that executed. Tasks that never ran (e.g. skipped after an upstream failure, or excluded from a partial rerun) appear as synthetic entries with state == 'did_not_run'.

Example output

{'cost': 0.00012762248102453157, 'end_time': datetime.datetime(2026, 4, 29, 11, 46, 32, 14506), 'job_version': 1, 'original_run_id': -1, 'partial_rerun': False, 'scheduled_time': None, 'start_provisioning_time': datetime.datetime(2026, 4, 29, 11, 45, 57, 641051), 'start_time': datetime.datetime(2026, 4, 29, 11, 45, 59, 95784), 'state': 'completed', 'submit_time': datetime.datetime(2026, 4, 29, 11, 45, 57, 652091), 'tried_to_start_time': 0, 'name': 'Simple Data Pipeline', 'run_id': 12, 'tasks': [{'cost': 1.3018850769988e-05, 'end_time': datetime.datetime(2026, 4, 29, 11, 46, 17, 598195), 'scheduled_time': None, 'start_provisioning_time': datetime.datetime(2026, 4, 29, 11, 45, 59, 162721), 'start_time': datetime.datetime(2026, 4, 29, 11, 46, 8, 697732), 'state': 'completed', 'submit_time': datetime.datetime(2026, 4, 29, 11, 45, 57, 652091), 'tried_to_start_time': 0, 'name': 'get_data', 'id': 0, 'dependencies': [], 'resources': Resources(memory='256m', cpu=1.0)}]}

:param run_id: The id of the run to get the details of. :return: A dictionary containing the details of the run.

runs

List all runs of the job. The list includes the top-level data and not the tasks. To retrieve the tasks, use the run_details method.

Example output

[]

:param start_time: The start time of the runs to filter by. :param end_time: The end time of the runs to filter by. :param refresh: If True, it will refresh the cached runs. :return: A list of mother-job run dictionaries.

save

Save the job to the Datatailr platform. If the job already exists, it will be updated. The repository state is verified and an image is prepared for execution.

start

Start the job. This will start the job execution on a schedule for workflows if a schedule was specified. For other types of jobs and for workflows without a schedule the job will be run immediately.

to_dict

Convert the Job instance to a dictionary representation.

to_json

Convert the Job instance to a JSON string representation.

verify_repo_is_ready

Verify if the repository is ready for job execution. The check consists of two parts: 1. Check if there are uncommitted changes in the repository. 2. Check if the local commit matches the remote HEAD (the repo is synced with the remote). Returns a tuple of (branch: str, commit_hash: str).

versions

List all versions of the job in the specified environment If no environment is specified, it lists versions across all environments.

JobType

Enum representing different types of Datatailr jobs.

from_str classmethod

Create a JobType from a string while normalizing legacy aliases.

Resources dataclass

Represents the compute resources allocated to a job container.

Attributes:

  • memory (str) –

    Memory limit as a string (e.g. "256m", "1g").

  • cpu (float) –

    Number of CPU cores to allocate (e.g. 1, 0.5).

Schedule

Build a schedule object for batch/workflow jobs using friendly fields.

Examples:

schedule = Schedule(at_hours=[0])
schedule = Schedule(at_minutes=[0, 30], weekdays=["Mon", "Wed", "Fri"])

You can either provide a raw cron_expression or use the human-readable helper parameters (which are compiled into cron syntax automatically).

Parameters:

  • cron_expression

    (str, default: '' ) –

    A raw cron string (e.g. "0 */2 * * *"). If provided alongside helper parameters, the helpers take precedence.

  • at_minutes

    (list[int] | None, default: None ) –

    Specific minutes past the hour to run (e.g. [0, 30] for ":00" and ":30").

  • every_minute

    (int | None, default: None ) –

    Run every N minutes (e.g. 5 for every 5 minutes).

  • at_hours

    (list[int] | None, default: None ) –

    Specific hours of the day to run (0 -- 23).

  • every_hour

    (int | None, default: None ) –

    Run every N hours.

  • weekdays

    (list[str] | None, default: None ) –

    Days of the week to run (e.g. ["mon", "wed", "fri"]).

  • day_of_month

    (int | None, default: None ) –

    Day of the month to run (1 -- 31).

  • in_month

    (list[str] | None, default: None ) –

    Months to run in (e.g. ["jan", "apr", "jul", "oct"]).

  • every_month

    (int | None, default: None ) –

    Run every N months.

  • timezone

    (str | None, default: None ) –

    IANA timezone name (e.g. "America/New_York").

  • run_after_job_uuid

    (str | None, default: None ) –

    UUID of a job that must complete before this schedule triggers.

  • run_after_job_name

    (str | None, default: None ) –

    Name of a job that must complete before this schedule triggers.

  • run_after_job_condition

    (str | None, default: None ) –

    Required completion status of the predecessor job (e.g. "success").

Example
from datatailr import Schedule
# Every weekday at 08:00 and 16:00 UTC
s = Schedule(at_hours=[8, 16], weekdays=["mon","tue","wed","thu","fri"])

get_cron_string

Return the compiled cron string.

Returns:

  • str

    Cron string (str).

Examples:

>>> Schedule(
...     at_minutes=[0, 15, 30, 45],
...     at_hours=[0, 12],
...     weekdays=["Mon", "Wed", "Fri"],
...     day_of_month=15,
...     in_month=["Jan", "Jul"],
... ).get_cron_string()
'0 0,15,30,45 0,12 15 1,7 1,3,5'

Service

Represents a long-running background service deployment on Datatailr.

A service runs continuously (e.g., an API server, a message consumer, or any always-on process). It is restarted automatically if it exits.

Example
from datatailr import Service

# service.py
from flask import Flask

app = Flask(__name__)

@app.route("/health")
def health_check():
    return "OK"

# Service entrypoints receive the port from Datatailr.
def run_server(port):
    app.run("0.0.0.0", port=int(port), debug=False)

svc = Service(
    name="Simple Service",
    entrypoint=run_server,
    python_requirements=["flask"],
)
svc.run()

Parameters:

  • name

    (str) –

    Display name for the service.

  • entrypoint

    (Callable) –

    The callable (function) that starts the service.

  • environment

    (Environment | None, default: DEV ) –

    Target environment for the deployment.

  • image

    (Image | None, default: None ) –

    Pre-configured container Image.

  • run_as

    (str | User | None, default: None ) –

    User or username to run the service as.

  • resources

    (Resources, default: Resources() ) –

    CPU and memory resources for the container.

  • acl

    (ACL | None, default: None ) –

    Access control list.

  • python_version

    (str, default: '3.12' ) –

    Python version for the container image.

  • python_requirements

    (str | List[str], default: '' ) –

    Python dependencies (see Image).

  • build_script_pre

    (str, default: '' ) –

    Dockerfile commands to run before installing requirements.

  • build_script_post

    (str, default: '' ) –

    Dockerfile commands to run after installing requirements.

  • env_vars

    (dict[str, str | int | float | bool] | None, default: None ) –

    Environment variables passed to the running container.

  • get_existing

    (bool, default: False ) –

    If True, update an existing job definition.

  • version

    (str | int | None, default: None ) –

    The version of the job to get.

  • budget

    (Budget | str | None, default: None ) –

    Optional spend budget as Budget("name") or a name str. Will be set to the 'default' budget if not provided.

Task

Represents a job within a batch job.

This class can be extended to define specific configurations for each job in the batch.

args property writable

Returns the arguments for the Task instance.

id property

Returns the unique identifier of the Task instance.

internal_name property

Returns the internal name of the Task instance.

__call__

Allows the Task instance to be called like a function, returning itself. This is useful for chaining or functional-style programming.

alias

Set an alias for the Task instance.

:param name: The alias name to set.

run

Execute the job's entrypoint.

set_resources

Set the resources for the Task instance.

:param resources: The Resources instance to set.

to_dict

Convert the Task instance to a dictionary representation.

to_json

Convert the Task instance to a JSON string representation.

translate_dependencies

Translate the dependencies of the Task instance into a format suitable for the batch job.

update_env_vars

Update the environment variables for the Task instance.

:param env_vars: A dictionary of environment variables to update.

TaskError

Exception raised for errors related to workflow tasks.

Workflow

Represents a workflow in the scheduler.

Inherits from Job and is used to define workflows with specific configurations.

next_job_id property

Returns a generator for the next task ID in the workflow.

add_job

Adds a task to the workflow.

:param job: The Task instance to add.

get_env_vars_copy

Returns a copy of the environment variables for the Workflow instance.

rerun

Rerun a run id with specific version (latest version by default). If full is true then all the tasks will be rerun, including the ones that already ran successfully.

:param run_id: The run id to rerun. :param version: The version to rerun. If None, it will rerun the latest version. :param full: If True, it will rerun the full workflow, including the ones that already ran successfully. :return: A tuple of (success: bool, message: str).

reset

Reset the Workflow instance to its initial state.

set_local_run

Set the local run flag for the Workflow instance.

:param local_run: A boolean indicating whether to run locally.

to_dict

Convert the Workflow instance to a dictionary representation.

to_json

Convert the Workflow instance to a JSON string representation.

task

Decorator that turns a function into a workflow task.

Apply this via the public alias @task(). When the decorated function is called inside a workflow-decorated function, the call returns a Task object that becomes a node in the workflow DAG. Dependencies between tasks are inferred automatically from the data flow between calls.

Tasks are only registered for deployment when they are invoked from within the body of a @workflow-decorated function, which builds a Workflow. Calling a @task-decorated function outside that context executes it locally as a normal Python function instead.

See Task below for methods available on the returned object (for example alias() and set_resources()).

Parameters:

  • memory

    (str, default: DEFAULT_TASK_MEMORY ) –

    Memory limit for this task's container (e.g. "256m", "1g").

  • cpu

    (float, default: DEFAULT_TASK_CPU ) –

    Number of CPU cores to allocate for this task (e.g. 1, 0.5).

Returns:

  • Task ( Callable[[Callable[..., Any]], Callable[..., Any]] ) –

    The decorated function. When that function is called inside a @workflow-decorated function, the call returns a Task object that becomes a node in the workflow DAG.

Example

# tasks.py
from datatailr import task

@task(memory="512m", cpu=2)
def heavy_computation(x, y):
    return x ** y

@task()
def prepare():
    return 2
Then to deploy the workflow, use the following code:
# deploy.py
from datatailr import workflow
from tasks import heavy_computation, prepare

@workflow("My Workflow")
def my_workflow():
    base = prepare()
    heavy_computation(base, 10)

if __name__ == "__main__":
    my_workflow()  # deploy to Datatailr