Taskinity is a modern framework for defining, managing, and monitoring task flows using an intuitive DSL and Python decorators. Designed with simplicity and efficiency in mind, Taskinity offers significantly less overhead than Prefect, Airflow, or Luigi, working instantly without complicated configuration.
Our mission is to provide a simple yet powerful task orchestration tool that allows teams to focus on business logic rather than infrastructure management. We believe workflow automation should be accessible to everyone, regardless of team size or budget.
Taskinity achieves its mission through:
# Installation with pip
pip install taskinity
# OR installation with poetry
poetry add taskinity
# Run example
python -m examples.basic_flow
To enable syntax highlighting and flow diagram rendering in your Markdown files, simply add this script tag at the end of your Markdown file:
<script src="https://taskinity.github.io/render/taskinity-render.min.js"></script>
This will automatically:
# My Taskinity Project
This project uses Taskinity for workflow automation.
## Flow Definition
```
flow DataProcessing:
description: "Data Processing Flow"
fetch_data -> process_data
process_data -> analyze_data
```
## Implementation
```python
from taskinity import task, run_flow_from_dsl
@task(name="Fetch Data")
def fetch_data():
return {"data": [1, 2, 3, 4, 5]}
@task(name="Process Data")
def process_data(data):
return {"processed": [x * 2 for x in data["data"]]}
```
<!-- Add this at the end of your README.md -->
<script src="https://taskinity.github.io/render/taskinity-render.min.js"></script>
from taskinity import task, run_flow_from_dsl
# Define tasks
@task(name="Fetch Data")
def fetch_data(url: str):
# Implementation
return data
@task(name="Process Data")
def process_data(data):
# Implementation
return processed_data
# Define flow using DSL
flow_dsl = """
flow DataProcessing:
description: "Data Processing Flow"
fetch_data -> process_data
"""
# Run the flow
results = run_flow_from_dsl(flow_dsl, {"url": "https://example.com/data"})
Taskinity follows a modular architecture for better organization and extensibility:
The core module (taskinity/core/
) contains the essential functionality:
Optional extensions enhance Taskinity with additional features:
flow EmailProcessing:
description: "Email Processing Flow"
fetch_emails -> classify_emails
classify_emails -> process_urgent_emails
classify_emails -> process_regular_emails
process_urgent_emails -> send_responses
process_regular_emails -> send_responses
Taskinity includes a variety of examples in the examples
directory. Each example is self-contained with its own README, configuration files, and Docker setup where applicable.
from taskinity import task, run_flow_from_dsl
@task(name="Fetch Emails")
def fetch_emails(server, username, password):
# Implementation
return ["Email 1", "Email 2"]
@task(name="Classify Emails")
def classify_emails(emails):
# Implementation
urgent = [e for e in emails if "URGENT" in e]
regular = [e for e in emails if "URGENT" not in e]
return {"urgent_emails": urgent, "regular_emails": regular}
# Flow definition using DSL
email_dsl = """
flow EmailProcessing:
description: "Email Processing Flow"
fetch_emails -> classify_emails
classify_emails -> process_urgent_emails
classify_emails -> process_regular_emails
"""
from taskinity import task, run_flow_from_dsl
def validate_input_data(data):
if not isinstance(data, list):
raise ValueError("Input data must be a list")
@task(name="Analyze Data", validate_input=validate_input_data)
def analyze_data(data):
return {"summary": sum(data), "average": sum(data) / len(data)}
Taskinity includes simple tools for flow visualization:
# Visualize DSL definition
python visualize_flow.py dsl --file email_processing.dsl --output flow_diagram.png
# Visualize flow execution history
python visualize_flow.py flow [flow_id] --output execution_diagram.png
Example ASCII diagram:
=== EmailProcessing ===
[fetch_emails]
[classify_emails]
[process_urgent_emails]
[process_regular_emails]
[send_responses]
Connections:
fetch_emails --> classify_emails
classify_emails --> process_urgent_emails
classify_emails --> process_regular_emails
process_urgent_emails --> send_responses
process_regular_emails --> send_responses
Taskinity automatically saves flow execution logs in the logs/
directory. They can be easily viewed using standard tools:
# View logs for a specific flow
import json
from pathlib import Path
def view_flow_logs(flow_id):
flow_file = Path("flows") / f"{flow_id}.json"
if flow_file.exists():
with open(flow_file, "r") as f:
flow_data = json.load(f)
print(f"Flow: {flow_data['name']} (Status: {flow_data['status']})")
print(f"Duration: {flow_data.get('duration', 'N/A')} seconds")
Criterion | Taskinity | Prefect | Airflow | Luigi |
---|---|---|---|---|
Project Type | Lightweight flows | Complex orchestration | Complex ETL | Simple ETL |
Syntax | DSL + decorators | @flow/@task decorators |
Classes with DAG |
Classes with run() |
Dependencies | None | prefect>=2.0 |
apache-airflow |
luigi |
Observability | Basic logs + UI | Grafana/Prometheus | Built-in UI | Text logs |
Data Validation | Custom functions | Pydantic types | None | None |
Parallelism | Threads (future) | Threads/Processes | Executor | Sequential |
Setup Time | < 1 minute | 15-30 minutes | 30-60 minutes | 5-10 minutes |
Learning curve | Very flat | Moderate | Steep | Moderate |
from taskinity import task, run_flow_from_dsl
@task(name="Fetch emails")
def fetch_emails(server: str) -> list:
# Implementation
return emails
@task(name="Classify")
def classify(emails: list) -> dict:
# Email classification
return {"urgent": [...], "regular": [...]}
flow = """
flow EmailFlow:
fetch_emails -> classify
"""
Taskinity offers two types of dashboards for flow monitoring:
A simple, lightweight dashboard with log history view and quick diagram preview:
python mini_dashboard.py
Mini Dashboard Features:
An extended dashboard with full functionality:
python simple_dashboard.py
Taskinity offers a notification system for flow status via email and Slack:
# Edit notification configuration
python -c "from notification_service import load_config, save_config; config = load_config(); config['enabled'] = True; save_config(config)"
Taskinity enables parallel execution of independent tasks in a flow:
# Run flow with parallel execution
from taskinity.parallel_executor import run_parallel_flow_from_dsl
result = run_parallel_flow_from_dsl(dsl_content, input_data)
Taskinity allows scheduling automatic flow execution:
# Start the scheduler
python flow_scheduler.py start
# Create a schedule (every 60 minutes)
python flow_scheduler.py create dsl_definitions/email_processing.dsl 60
@task
@task(name=None, description=None, validate_input=None, validate_output=None)
def my_task():
pass
@flow
@flow(name=None, description=None)
def my_flow():
pass
# Parse DSL text into a structured flow definition
parse_dsl(dsl_text: str) -> Dict[str, Any]
# Run a flow defined in DSL
run_flow_from_dsl(dsl_text: str, input_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]
Taskinity can be extended with additional functionality through plugins:
# Register a custom plugin
from taskinity.extensions import register_plugin
register_plugin("my_plugin", MyPluginClass)
Available plugins: