threading: Schedule TILE tasks for all passes at once

Closes #465.
This commit is contained in:
Victorien Le Couviour--Tuffet
2026-04-27 21:09:28 +02:00
parent c0f2fe3135
commit f995e1fbf9
+31 -48
View File
@@ -270,27 +270,35 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass,
{
Dav1dTask *tasks = f->task_thread.tile_tasks[0];
const int uses_2pass = f->c->n_fc > 1;
const int num_tasks = f->frame_hdr->tiling.cols * f->frame_hdr->tiling.rows;
const int n_tasks_per_pass = f->frame_hdr->tiling.cols * f->frame_hdr->tiling.rows;
const int n_tasks = n_tasks_per_pass * (1 + uses_2pass);
if (pass < 2) {
int alloc_num_tasks = num_tasks * (1 + uses_2pass);
if (alloc_num_tasks > f->task_thread.num_tile_tasks) {
const size_t size = sizeof(Dav1dTask) * alloc_num_tasks;
if (n_tasks > f->task_thread.num_tile_tasks) {
const size_t size = sizeof(Dav1dTask) * n_tasks;
tasks = dav1d_realloc(ALLOC_COMMON_CTX, f->task_thread.tile_tasks[0], size);
if (!tasks) return -1;
memset(tasks, 0, size);
f->task_thread.tile_tasks[0] = tasks;
f->task_thread.num_tile_tasks = alloc_num_tasks;
f->task_thread.num_tile_tasks = n_tasks;
}
f->task_thread.tile_tasks[1] = tasks + num_tasks;
f->task_thread.tile_tasks[1] = tasks + n_tasks_per_pass;
}
tasks += num_tasks * (pass & 1);
assert(n_tasks <= f->task_thread.num_tile_tasks);
Dav1dTask *pf_t;
if (create_filter_sbrow(f, pass, &pf_t))
return -1;
Dav1dTask *const p1_tasks = f->task_thread.tile_tasks[1];
Dav1dTask *prev_t = NULL;
for (int tile_idx = 0; tile_idx < num_tasks; tile_idx++) {
if (pass == 2) {
prev_t = &p1_tasks[n_tasks_per_pass - 1];
// PF task is scheduled after the last sby=0 TILE task
if (f->frame_hdr->tiling.rows == 1)
prev_t = prev_t->next;
}
tasks += (pass & 1) * n_tasks_per_pass;
for (int tile_idx = 0; tile_idx < n_tasks_per_pass; tile_idx++) {
Dav1dTileState *const ts = &f->ts[tile_idx];
Dav1dTask *t = &tasks[tile_idx];
t->sby = ts->tiling.row_start >> f->sb_shift;
@@ -319,17 +327,15 @@ int dav1d_task_create_tile_sbrow(Dav1dFrameContext *const f, const int pass,
// XXX in theory this could be done locklessly, at this point they are no
// tasks in the frameQ, so no other runner should be using this lock, but
// we must add both passes at once
pthread_mutex_lock(&f->task_thread.pending_tasks.lock);
assert(f->task_thread.pending_tasks.head == NULL || pass == 2);
if (!f->task_thread.pending_tasks.head)
f->task_thread.pending_tasks.head = &tasks[0];
else
f->task_thread.pending_tasks.tail->next = &tasks[0];
f->task_thread.pending_tasks.tail = prev_t;
atomic_store(&f->task_thread.pending_tasks.merge, 1);
atomic_store(&f->task_thread.init_done, 1);
pthread_mutex_unlock(&f->task_thread.pending_tasks.lock);
if (!(pass & 1)) {
pthread_mutex_lock(&f->task_thread.pending_tasks.lock);
assert(f->task_thread.pending_tasks.head == NULL);
f->task_thread.pending_tasks.head = f->task_thread.tile_tasks[pass == 2];
f->task_thread.pending_tasks.tail = prev_t;
atomic_store(&f->task_thread.pending_tasks.merge, 1);
atomic_store(&f->task_thread.init_done, 1);
pthread_mutex_unlock(&f->task_thread.pending_tasks.lock);
}
return 0;
}
@@ -712,36 +718,13 @@ void *dav1d_worker_task(void *data) {
int res = DAV1D_ERR(EINVAL);
if (!atomic_load(&f->task_thread.error))
res = dav1d_decode_frame_init_cdf(f);
if (f->frame_hdr->refresh_context && !f->task_thread.update_set) {
if (f->frame_hdr->refresh_context && !f->task_thread.update_set)
atomic_store(f->out_cdf.progress, res < 0 ? TILE_ERROR : 1);
}
if (!res) {
assert(c->n_fc > 1);
for (int p = 1; p <= 2; p++) {
const int res = dav1d_task_create_tile_sbrow(f, p, 0);
if (res) {
pthread_mutex_lock(&ttd->lock);
// memory allocation failed
atomic_store(&f->task_thread.done[2 - p], 1);
atomic_store(&f->task_thread.error, -1);
atomic_fetch_sub(&f->task_thread.task_counter,
f->frame_hdr->tiling.cols *
f->frame_hdr->tiling.rows + f->sbh);
atomic_store(&f->sr_cur.progress[p - 1], FRAME_ERROR);
if (p == 2 && atomic_load(&f->task_thread.done[1])) {
assert(!atomic_load(&f->task_thread.task_counter));
dav1d_decode_frame_exit(f, DAV1D_ERR(ENOMEM));
f->n_tile_data = 0;
pthread_cond_signal(&f->task_thread.cond);
} else {
pthread_mutex_unlock(&ttd->lock);
}
}
}
pthread_mutex_lock(&ttd->lock);
} else {
pthread_mutex_lock(&ttd->lock);
abort_frame(f, res);
for (int p = 1; p <= 2 && !res; p++)
res = dav1d_task_create_tile_sbrow(f, p, 0);
pthread_mutex_lock(&ttd->lock);
if (res) {
abort_frame(f, DAV1D_ERR(ENOMEM));
reset_task_cur(c, ttd, t->frame_idx);
atomic_store(&f->task_thread.init_done, 1);
}