aboutsummaryrefslogtreecommitdiff
path: root/io_uring/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r--io_uring/io_uring.c75
1 files changed, 50 insertions, 25 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 801293399883..06ff41484e29 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -121,6 +121,7 @@
#define IO_COMPL_BATCH 32
#define IO_REQ_ALLOC_BATCH 8
+#define IO_LOCAL_TW_DEFAULT_MAX 20
struct io_defer_entry {
struct list_head list;
@@ -1255,12 +1256,14 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
struct llist_node *node = llist_del_all(&ctx->work_llist);
__io_fallback_tw(node, false);
+ node = llist_del_all(&ctx->retry_llist);
+ __io_fallback_tw(node, false);
}
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
int min_events)
{
- if (llist_empty(&ctx->work_llist))
+ if (!io_local_work_pending(ctx))
return false;
if (events < min_events)
return true;
@@ -1269,8 +1272,29 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
return false;
}
+static int __io_run_local_work_loop(struct llist_node **node,
+ struct io_tw_state *ts,
+ int events)
+{
+ int ret = 0;
+
+ while (*node) {
+ struct llist_node *next = (*node)->next;
+ struct io_kiocb *req = container_of(*node, struct io_kiocb,
+ io_task_work.node);
+ INDIRECT_CALL_2(req->io_task_work.func,
+ io_poll_task_func, io_req_rw_complete,
+ req, ts);
+ *node = next;
+ if (++ret >= events)
+ break;
+ }
+
+ return ret;
+}
+
static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
- int min_events)
+ int min_events, int max_events)
{
struct llist_node *node;
unsigned int loops = 0;
@@ -1281,25 +1305,23 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
again:
+ min_events -= ret;
+ ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, max_events);
+ if (ctx->retry_llist.first)
+ goto retry_done;
+
/*
* llists are in reverse order, flip it back the right way before
* running the pending items.
*/
node = llist_reverse_order(llist_del_all(&ctx->work_llist));
- while (node) {
- struct llist_node *next = node->next;
- struct io_kiocb *req = container_of(node, struct io_kiocb,
- io_task_work.node);
- INDIRECT_CALL_2(req->io_task_work.func,
- io_poll_task_func, io_req_rw_complete,
- req, ts);
- ret++;
- node = next;
- }
+ ret += __io_run_local_work_loop(&node, ts, max_events - ret);
+ ctx->retry_llist.first = node;
loops++;
if (io_run_local_work_continue(ctx, ret, min_events))
goto again;
+retry_done:
io_submit_flush_completions(ctx);
if (io_run_local_work_continue(ctx, ret, min_events))
goto again;
@@ -1313,18 +1335,20 @@ static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
{
struct io_tw_state ts = {};
- if (llist_empty(&ctx->work_llist))
+ if (!io_local_work_pending(ctx))
return 0;
- return __io_run_local_work(ctx, &ts, min_events);
+ return __io_run_local_work(ctx, &ts, min_events,
+ max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
}
-static int io_run_local_work(struct io_ring_ctx *ctx, int min_events)
+static int io_run_local_work(struct io_ring_ctx *ctx, int min_events,
+ int max_events)
{
struct io_tw_state ts = {};
int ret;
mutex_lock(&ctx->uring_lock);
- ret = __io_run_local_work(ctx, &ts, min_events);
+ ret = __io_run_local_work(ctx, &ts, min_events, max_events);
mutex_unlock(&ctx->uring_lock);
return ret;
}
@@ -2328,9 +2352,9 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
int io_run_task_work_sig(struct io_ring_ctx *ctx)
{
- if (!llist_empty(&ctx->work_llist)) {
+ if (io_local_work_pending(ctx)) {
__set_current_state(TASK_RUNNING);
- if (io_run_local_work(ctx, INT_MAX) > 0)
+ if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
return 0;
}
if (io_run_task_work() > 0)
@@ -2459,7 +2483,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
{
if (unlikely(READ_ONCE(ctx->check_cq)))
return 1;
- if (unlikely(!llist_empty(&ctx->work_llist)))
+ if (unlikely(io_local_work_pending(ctx)))
return 1;
if (unlikely(task_work_pending(current)))
return 1;
@@ -2493,8 +2517,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
if (!io_allowed_run_tw(ctx))
return -EEXIST;
- if (!llist_empty(&ctx->work_llist))
- io_run_local_work(ctx, min_events);
+ if (io_local_work_pending(ctx))
+ io_run_local_work(ctx, min_events,
+ max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
io_run_task_work();
if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
@@ -2564,8 +2589,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
* If we got woken because of task_work being processed, run it
* now rather than let the caller do another wait loop.
*/
- if (!llist_empty(&ctx->work_llist))
- io_run_local_work(ctx, nr_wait);
+ if (io_local_work_pending(ctx))
+ io_run_local_work(ctx, nr_wait, nr_wait);
io_run_task_work();
/*
@@ -3077,7 +3102,7 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
io_allowed_defer_tw_run(ctx))
- ret |= io_run_local_work(ctx, INT_MAX) > 0;
+ ret |= io_run_local_work(ctx, INT_MAX, INT_MAX) > 0;
ret |= io_cancel_defer_files(ctx, tctx, cancel_all);
mutex_lock(&ctx->uring_lock);
ret |= io_poll_remove_all(ctx, tctx, cancel_all);
@@ -3158,7 +3183,7 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
io_run_task_work();
io_uring_drop_tctx_refs(current);
xa_for_each(&tctx->xa, index, node) {
- if (!llist_empty(&node->ctx->work_llist)) {
+ if (io_local_work_pending(node->ctx)) {
WARN_ON_ONCE(node->ctx->submitter_task &&
node->ctx->submitter_task != current);
goto end_wait;