Task Queue Priority and Fairness
Task Queue priority
Task Queue priority allows you to control the execution order of Workflows, Activities, and child Workflows based on assigned priority values within a single Task Queue. You can select a priority level in the integer range [1,5]. A lower value implies higher priority, so 1 is the highest priority level. The default priority if unspecified is 3.
If you're using Temporal Cloud, contact Temporal support or your Temporal account team to enable this feature for your cloud Namespaces.
If you're self-hosting Temporal, use the latest pre-release development server and set matching.useNewMatcher to true in the dynamic config on the relevant Task Queues or Namespaces.
Choose your SDK below to see an example of setting priority for your Workflows:
workflowOptions := client.StartWorkflowOptions{
ID: "my-workflow-id",
TaskQueue: "my-task-queue",
Priority: temporal.Priority{PriorityKey: 5},
}
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, MyWorkflow)
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("my-task-queue")
.setPriority(Priority.newBuilder().setPriorityKey(5).build())
.build();
WorkflowClient client = WorkflowClient.newInstance(service);
MyWorkflow workflow = client.newWorkflowStub(MyWorkflow.class, options);
workflow.run();
await client.start_workflow(
MyWorkflow.run,
args="hello",
id="my-workflow-id",
task_queue="my-task-queue",
priority=Priority(priority_key=1),
)
var handle = await Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync("hello"),
new StartWorkflowOptions(
id: "my-workflow-id",
taskQueue: "my-task-queue"
)
{
Priority = new Priority(1),
}
);
Choose your SDK below to see an example of setting priority for your Activities:
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
Priority: temporal.Priority{PriorityKey: 3},
}
ctx := workflow.WithActivityOptions(ctx, ao)
err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
ActivityOptions options = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofMinutes(1))
.setPriority(Priority.newBuilder().setPriorityKey(3).build())
.build();
MyActivity activity = Workflow.newActivityStub(MyActivity.class, options);
activity.perform();
await workflow.execute_activity(
say_hello,
"hi",
priority=Priority(priority_key=3),
start_to_close_timeout=timedelta(seconds=5),
)
await Workflow.ExecuteActivityAsync(
() => SayHello("hi"),
new()
{
StartToCloseTimeout = TimeSpan.FromSeconds(5),
Priority = new(3),
}
);
Choose your SDK below to see an example of setting priority for your Child Workflows:
cwo := workflow.ChildWorkflowOptions{
WorkflowID: "child-workflow-id",
TaskQueue: "child-task-queue",
Priority: temporal.Priority{PriorityKey: 1},
}
ctx := workflow.WithChildOptions(ctx, cwo)
err := workflow.ExecuteChildWorkflow(ctx, MyChildWorkflow).Get(ctx, nil)
ChildWorkflowOptions childOptions = ChildWorkflowOptions.newBuilder()
.setTaskQueue("child-task-queue")
.setWorkflowId("child-workflow-id")
.setPriority(Priority.newBuilder().setPriorityKey(1).build())
.build();
MyChildWorkflow child = Workflow.newChildWorkflowStub(MyChildWorkflow.class, childOptions);
child.run();
await workflow.execute_child_workflow(
MyChildWorkflow.run,
args="hello child",
priority=Priority(priority_key=1),
)
await Workflow.ExecuteChildWorkflowAsync(
(MyChildWorkflow wf) => wf.RunAsync("hello child"),
new() { Priority = new(1) });
Task Queue fairness
Task Queue fairness allows for more control over the order that Tasks are dispatched from a backlog. It’s intended to address common situations like, multi-tenant applications and reserved capacity bands.
Fairness creates multiple “virtual queues”, one for each fairness key, within a Task Queue that are dispatched in a weighted-round-robin manner. This lets you handle scenarios like:
- Multi-tenant applications with big and small tenants where small tenants should not not be blocked by big ones and any single user should be able to use all available resources rather than being rate limited at some fixed level.
- Assign Tasks to bands and then dispatch 80% from one band and 20% from another, for example, without limiting overall capacity when one band is empty.
To enable this feature, use the latest pre-release development server and set matching.enableFairness to true in the dynamic config on the relevant Task Queues or Namespaces.
For Public Preview, fairness cannot be enabled for active Task Queues. The Task Queues have to be new or idle and there can't be any running Workflows. Once fairness is enabled, all existing backlog Tasks in the Task Queue will be abandoned.
How it works
Each priority level acts as a separate virtual Task Queue. Fairness attempts to distribute all the Tasks from a higher priority level first before distributing any from a lower priority. There are fairness keys which are the attribute used to distinguish Tasks in the context of fair dispatch. There are also fairness weights which is the weight assigned to a fairness key to allow for unequal distribution among tenants or for allocating fractions of capacity to different levels or types of Tasks.
Fairness applies within each priority level. It sequences Tasks in the Task Queue probabilistically using a weighted round-robin, based on:
- Fairness weights the customer sets
- The current backlog of Tasks
- A data structure that tracks how Tasks are distributed for different fairness keys
When to use fairness
Fairness applies to backlogged Tasks when there isn't sufficient Worker capacity to dispatch Tasks immediately. If all Tasks can be dispatched immediately, the you don't need to use fairness.
Fairness applies at Task dispatch time based on information about the Tasks passing through the Task Queue and considers each Task as having equal cost. It doesn't consider any Task execution that is currently being done by Workers. So if you look at Tasks being process by Workers, you might not see "fairness" across tenants.
Limitations of fairness
When you use Worker Versioning and you're moving Workflows from one version to another, the ordering of Workflow Tasks that are moved to the next version is undefined. Tasks redirected to a new Worker version may not be treated fairly with respect to each other or Tasks that aren't redirected.
There isn't a limit on the number of fairness keys you can use, but their accuracy can degrade as you add more fairness keys.
Task Queues are internally partitioned and Tasks are distributed to partitions randomly. This could interfer with fairnesss. Depending on your use case, you can reach out to Temporal support to get your Task Queues set to a single partition.
Choose your SDK below to see an example of setting fairness for your Workflows:
workflowOptions := client.StartWorkflowOptions{
ID: "my-workflow-id",
TaskQueue: "my-task-queue",
Priority: temporal.Priority{
PriorityKey: 1,
FairnessKey: "a-key",
FairnessWeight: 3.14,
},
}
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, MyWorkflow)
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("my-task-queue")
.setPriority(Priority.newBuilder().setPriorityKey(5).setFairnessKey("a-key").setFairnessWeight(3.14).build())
.build();
WorkflowClient client = WorkflowClient.newInstance(service);
MyWorkflow workflow = client.newWorkflowStub(MyWorkflow.class, options);
workflow.run();
await client.start_workflow(
MyWorkflow.run,
args="hello",
id="my-workflow-id",
task_queue="my-task-queue",
priority=Priority(priority_key=3, fairness_key="a-key", fairness_weight=3.14),
)
const handle = await startWorkflow(workflows.priorityWorkflow, {
args: [false, 1],
priority: { priorityKey: 3, fairnessKey: 'a-key', fairnessWeight: 3.14 },
});
var handle = await Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync("hello"),
new StartWorkflowOptions(
id: "my-workflow-id",
taskQueue: "my-task-queue"
)
{
Priority = new Priority(
priorityKey: 3,
fairnessKey: "a-key",
fairnessWeight: 3.14
)
}
);
client.start_workflow(
MyWorkflow, "input-arg",
id: "my-workflow-id",
task_queue: "my-task-queue",
priority: Temporalio::Priority.new(
priority_key: 3,
fairness_key: "a-key",
fairness_weight: 3.14
)
)
Choose your SDK below to see an example of setting fairness for your Activities:
ao := workflow.ActivityOptions{
Stardd
}
ctx := workflow.WithActivityOptions(ctx, ao)
err := workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil)
ActivityOptions options = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofMinutes(1))
.setPriority(Priority.newBuilder().setPriorityKey(3).setFairnessKey("a-key").setFairnessWeight(3.14).build())
.build();
MyActivity activity = Workflow.newActivityStub(MyActivity.class, options);
activity.perform();
await workflow.execute_activity(
say_hello,
"hi",
priority=Priority(priority_key=3, fairness_key="a-key", fairness_weight=3.14),
start_to_close_timeout=timedelta(seconds=5),
)
const handle = await startWorkflow(workflows.priorityWorkflow, {
args: [false, 1],
priority: { priorityKey: 3, fairnessKey: 'a-key', fairnessWeight: 3.14 },
});
var handle = await Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync("hello"),
new StartWorkflowOptions(
id: "my-workflow-id",
taskQueue: "my-task-queue"
)
{
Priority = new Priority(
priorityKey: 3,
fairnessKey: "a-key",
fairnessWeight: 3.14
)
}
);
client.start_activity(
MyActivity, "input-arg",
id: "my-workflow-id",
task_queue: "my-task-queue",
priority: Temporalio::Priority.new(
priority_key: 3,
fairness_key: "a-key",
fairness_weight: 3.14
)
)
Choose your SDK below to see an example of setting fairness for your Child Workflows:
cwo := workflow.ChildWorkflowOptions{
WorkflowID: "child-workflow-id",
TaskQueue: "child-task-queue",
Priority: temporal.Priority{
PriorityKey: 1,
FairnessKey: "a-key",
FairnessWeight: 3.14,
},
}
ctx := workflow.WithChildOptions(ctx, cwo)
err := workflow.ExecuteChildWorkflow(ctx, MyChildWorkflow).Get(ctx, nil)
ChildWorkflowOptions childOptions = ChildWorkflowOptions.newBuilder()
.setTaskQueue("child-task-queue")
.setWorkflowId("child-workflow-id")
.setPriority(Priority.newBuilder().setPriorityKey(1).setFairnessKey("a-key").setFairnessWeight(3.14).build())
.build();
MyChildWorkflow child = Workflow.newChildWorkflowStub(MyChildWorkflow.class, childOptions);
child.run();
await workflow.execute_child_workflow(
MyChildWorkflow.run,
args="hello child",
priority=Priority(priority_key=3, fairness_key="a-key", fairness_weight=3.14),
)
const handle = await startChildWorkflow(workflows.priorityWorkflow, {
args: [false, 1],
priority: { priorityKey: 3, fairnessKey: 'a-key', fairnessWeight: 3.14 },
});
var handle = await Client.StartWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync("hello"),
new StartWorkflowOptions(
id: "my-workflow-id",
taskQueue: "my-task-queue"
)
{
Priority = new Priority(
priorityKey: 3,
fairnessKey: "a-key",
fairnessWeight: 3.14
)
}
);
client.start_child_workflow(
MyChildWorkflow, "input-arg",
id: "my-child-workflow-id",
task_queue: "my-task-queue",
priority: Temporalio::Priority.new(
priority_key: 3,
fairness_key: "a-key",
fairness_weight: 3.14
)
)