Now that creating workqueues doesn't cost that much and less direct
manipulation of workers is allowed, it might make sense to update
padata interface such that it allocates workqueues with proper flags
itself.
Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Herbert Xu <her...@gondor.apana.org.au>
Cc: David S. Miller <da...@davemloft.net>
Cc: Steffen Klassert <steffen....@secunet.com>
---
crypto/pcrypt.c | 4 ++--
1 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/crypto/pcrypt.c b/crypto/pcrypt.c
index 247178c..4728cdc 100644
--- a/crypto/pcrypt.c
+++ b/crypto/pcrypt.c
@@ -385,11 +385,11 @@ static struct crypto_template pcrypt_tmpl = {
static int __init pcrypt_init(void)
{
- encwq = create_workqueue("pencrypt");
+ encwq = alloc_workqueue("pencrypt", WQ_HIGHPRI | WQ_CPU_INTENSIVE, 1);
if (!encwq)
goto err;
- decwq = create_workqueue("pdecrypt");
+ decwq = alloc_workqueue("pdecrypt", WQ_HIGHPRI | WQ_CPU_INTENSIVE, 1);
if (!decwq)
goto err_destroy_encwq;
--
1.6.4.2
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majo...@vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
This is in preparation of concurrency managed workqueue where shared
multiple workers would be available per cpu.
Signed-off-by: Tejun Heo <t...@kernel.org>
---
kernel/workqueue.c | 211 +++++++++++++++++++++++++++++++++++++---------------
1 files changed, 150 insertions(+), 61 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index eeec736..0b0c360 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -33,6 +33,7 @@
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
+#include <linux/idr.h>
/*
* Structure fields follow one of the following exclusion rules.
@@ -46,6 +47,15 @@
* W: workqueue_lock protected.
*/
+struct cpu_workqueue_struct;
+
+struct worker {
+ struct work_struct *current_work; /* L: work being processed */
+ struct task_struct *task; /* I: worker task */
+ struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
+ int id; /* I: worker id */
+};
+
/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu). The lower WORK_STRUCT_FLAG_BITS of
@@ -58,15 +68,14 @@ struct cpu_workqueue_struct {
struct list_head worklist;
wait_queue_head_t more_work;
- struct work_struct *current_work;
unsigned int cpu;
+ struct worker *worker;
struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */
int flush_color; /* L: flushing color */
int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
- struct task_struct *thread;
};
/*
@@ -214,6 +223,9 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
/* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
+static DEFINE_PER_CPU(struct ida, worker_ida);
+
+static int worker_thread(void *__worker);
static int singlethread_cpu __read_mostly;
@@ -428,6 +440,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
+static struct worker *alloc_worker(void)
+{
+ struct worker *worker;
+
+ worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+ return worker;
+}
+
+/**
+ * create_worker - create a new workqueue worker
+ * @cwq: cwq the new worker will belong to
+ * @bind: whether to set affinity to @cpu or not
+ *
+ * Create a new worker which is bound to @cwq. The returned worker
+ * can be started by calling start_worker() or destroyed using
+ * destroy_worker().
+ *
+ * CONTEXT:
+ * Might sleep. Does GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * Pointer to the newly created worker.
+ */
+static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
+{
+ int id = -1;
+ struct worker *worker = NULL;
+
+ spin_lock(&workqueue_lock);
+ while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
+ spin_unlock(&workqueue_lock);
+ if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
+ goto fail;
+ spin_lock(&workqueue_lock);
+ }
+ spin_unlock(&workqueue_lock);
+
+ worker = alloc_worker();
+ if (!worker)
+ goto fail;
+
+ worker->cwq = cwq;
+ worker->id = id;
+
+ worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
+ cwq->cpu, id);
+ if (IS_ERR(worker->task))
+ goto fail;
+
+ if (bind)
+ kthread_bind(worker->task, cwq->cpu);
+
+ return worker;
+fail:
+ if (id >= 0) {
+ spin_lock(&workqueue_lock);
+ ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
+ spin_unlock(&workqueue_lock);
+ }
+ kfree(worker);
+ return NULL;
+}
+
+/**
+ * start_worker - start a newly created worker
+ * @worker: worker to start
+ *
+ * Start @worker.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void start_worker(struct worker *worker)
+{
+ wake_up_process(worker->task);
+}
+
+/**
+ * destroy_worker - destroy a workqueue worker
+ * @worker: worker to be destroyed
+ *
+ * Destroy @worker.
+ */
+static void destroy_worker(struct worker *worker)
+{
+ int cpu = worker->cwq->cpu;
+ int id = worker->id;
+
+ /* sanity check frenzy */
+ BUG_ON(worker->current_work);
+
+ kthread_stop(worker->task);
+ kfree(worker);
+
+ spin_lock(&workqueue_lock);
+ ida_remove(&per_cpu(worker_ida, cpu), id);
+ spin_unlock(&workqueue_lock);
+}
+
/**
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
* @cwq: cwq of interest
@@ -468,7 +579,7 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
/**
* process_one_work - process single work
- * @cwq: cwq to process work for
+ * @worker: self
* @work: work to process
*
* Process @work. This function contains all the logics necessary to
@@ -480,9 +591,9 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
* CONTEXT:
* spin_lock_irq(cwq->lock) which is released and regrabbed.
*/
-static void process_one_work(struct cpu_workqueue_struct *cwq,
- struct work_struct *work)
+static void process_one_work(struct worker *worker, struct work_struct *work)
{
+ struct cpu_workqueue_struct *cwq = worker->cwq;
work_func_t f = work->func;
int work_color;
#ifdef CONFIG_LOCKDEP
@@ -497,7 +608,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
#endif
/* claim and process */
debug_work_deactivate(work);
- cwq->current_work = work;
+ worker->current_work = work;
work_color = get_work_color(work);
list_del_init(&work->entry);
@@ -524,30 +635,33 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
spin_lock_irq(&cwq->lock);
/* we're done with it, release */
- cwq->current_work = NULL;
+ worker->current_work = NULL;
cwq_dec_nr_in_flight(cwq, work_color);
}
-static void run_workqueue(struct cpu_workqueue_struct *cwq)
+static void run_workqueue(struct worker *worker)
{
+ struct cpu_workqueue_struct *cwq = worker->cwq;
+
spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
- process_one_work(cwq, work);
+ process_one_work(worker, work);
}
spin_unlock_irq(&cwq->lock);
}
/**
* worker_thread - the worker thread function
- * @__cwq: cwq to serve
+ * @__worker: self
*
* The cwq worker thread function.
*/
-static int worker_thread(void *__cwq)
+static int worker_thread(void *__worker)
{
- struct cpu_workqueue_struct *cwq = __cwq;
+ struct worker *worker = __worker;
+ struct cpu_workqueue_struct *cwq = worker->cwq;
DEFINE_WAIT(wait);
if (cwq->wq->flags & WQ_FREEZEABLE)
@@ -566,11 +680,11 @@ static int worker_thread(void *__cwq)
if (kthread_should_stop())
break;
- if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed,
+ if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
get_cpu_mask(cwq->cpu))))
- set_cpus_allowed_ptr(cwq->thread,
+ set_cpus_allowed_ptr(worker->task,
get_cpu_mask(cwq->cpu));
- run_workqueue(cwq);
+ run_workqueue(worker);
}
return 0;
@@ -873,7 +987,7 @@ int flush_work(struct work_struct *work)
goto already_gone;
prev = &work->entry;
} else {
- if (cwq->current_work != work)
+ if (!cwq->worker || cwq->worker->current_work != work)
goto already_gone;
prev = &cwq->worklist;
}
@@ -937,7 +1051,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
int running = 0;
spin_lock_irq(&cwq->lock);
- if (unlikely(cwq->current_work == work)) {
+ if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
insert_wq_barrier(cwq, &barr, cwq->worklist.next);
running = 1;
}
@@ -1225,7 +1339,7 @@ int current_is_keventd(void)
BUG_ON(!keventd_wq);
cwq = get_cwq(cpu, keventd_wq);
- if (current == cwq->thread)
+ if (current == cwq->worker->task)
ret = 1;
return ret;
@@ -1273,38 +1387,6 @@ static void free_cwqs(struct cpu_workqueue_struct *cwqs)
#endif
}
-static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
- struct workqueue_struct *wq = cwq->wq;
- struct task_struct *p;
-
- p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
- /*
- * Nobody can add the work_struct to this cwq,
- * if (caller is __create_workqueue)
- * nobody should see this wq
- * else // caller is CPU_UP_PREPARE
- * cpu is not on cpu_online_map
- * so we can abort safely.
- */
- if (IS_ERR(p))
- return PTR_ERR(p);
- cwq->thread = p;
-
- return 0;
-}
-
-static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
- struct task_struct *p = cwq->thread;
-
- if (p != NULL) {
- if (cpu >= 0)
- kthread_bind(p, cpu);
- wake_up_process(p);
- }
-}
-
struct workqueue_struct *__create_workqueue_key(const char *name,
unsigned int flags,
struct lock_class_key *key,
@@ -1312,7 +1394,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
{
bool singlethread = flags & WQ_SINGLE_THREAD;
struct workqueue_struct *wq;
- int err = 0, cpu;
+ bool failed = false;
+ unsigned int cpu;
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
@@ -1342,20 +1425,21 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
- cwq->wq = wq;
cwq->cpu = cpu;
+ cwq->wq = wq;
cwq->flush_color = -1;
spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
- if (err)
+ if (failed)
continue;
- err = create_workqueue_thread(cwq, cpu);
- if (cpu_online(cpu) && !singlethread)
- start_workqueue_thread(cwq, cpu);
+ cwq->worker = create_worker(cwq,
+ cpu_online(cpu) && !singlethread);
+ if (cwq->worker)
+ start_worker(cwq->worker);
else
- start_workqueue_thread(cwq, -1);
+ failed = true;
}
spin_lock(&workqueue_lock);
@@ -1364,7 +1448,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
cpu_maps_update_done();
- if (err) {
+ if (failed) {
destroy_workqueue(wq);
wq = NULL;
}
@@ -1400,9 +1484,9 @@ void destroy_workqueue(struct workqueue_struct *wq)
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
int i;
- if (cwq->thread) {
- kthread_stop(cwq->thread);
- cwq->thread = NULL;
+ if (cwq->worker) {
+ destroy_worker(cwq->worker);
+ cwq->worker = NULL;
}
for (i = 0; i < WORK_NR_COLORS; i++)
@@ -1489,6 +1573,8 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
void __init init_workqueues(void)
{
+ unsigned int cpu;
+
/*
* cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
* Make sure that the alignment isn't lower than that of
@@ -1497,6 +1583,9 @@ void __init init_workqueues(void)
BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
__alignof__(unsigned long long));
+ for_each_possible_cpu(cpu)
+ ida_init(&per_cpu(worker_ida, cpu));
+
singlethread_cpu = cpumask_first(cpu_possible_mask);
hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events");
* A work queued to it is queued at the head of the worklist of the
respective gcwq after other highpri works, while normal works are
always appended at the end.
* As long as there are highpri works on gcwq->worklist,
[__]need_more_worker() remains %true and process_one_work() wakes up
another worker before it start executing a work.
The above two properties guarantee that works queued to high priority
workqueues are dispatched to workers and start execution as soon as
possible regardless of the state of other works.
Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Andi Kleen <an...@firstfloor.org>
Cc: Andrew Morton <ak...@linux-foundation.org>
---
include/linux/workqueue.h | 1 +
kernel/workqueue.c | 70 +++++++++++++++++++++++++++++++++++++++++----
2 files changed, 65 insertions(+), 6 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 0a7f797..006dcf7 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -231,6 +231,7 @@ enum {
WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
WQ_NON_REENTRANT = 1 << 2, /* guarantee non-reentrance */
WQ_RESCUER = 1 << 3, /* has an rescue worker */
+ WQ_HIGHPRI = 1 << 4, /* high priority */
WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */
WQ_DFL_ACTIVE = WQ_MAX_ACTIVE / 2,
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 979f893..e12f9aa 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -43,6 +43,7 @@ enum {
GCWQ_MANAGING_WORKERS = 1 << 1, /* managing workers */
GCWQ_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
GCWQ_FREEZING = 1 << 3, /* freeze in progress */
+ GCWQ_HIGHPRI_PENDING = 1 << 4, /* highpri works on queue */
/* worker flags */
WORKER_STARTED = 1 << 0, /* started */
@@ -452,15 +453,19 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
* assume that they're being called with gcwq->lock held.
*/
+static bool __need_more_worker(struct global_cwq *gcwq)
+{
+ return !atomic_read(get_gcwq_nr_running(gcwq->cpu)) ||
+ gcwq->flags & GCWQ_HIGHPRI_PENDING;
+}
+
/*
* Need to wake up a worker? Called from anything but currently
* running workers.
*/
static bool need_more_worker(struct global_cwq *gcwq)
{
- atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
-
- return !list_empty(&gcwq->worklist) && !atomic_read(nr_running);
+ return !list_empty(&gcwq->worklist) && __need_more_worker(gcwq);
}
/* Can I start working? Called from busy but !running workers. */
@@ -734,6 +739,43 @@ static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
}
/**
+ * gcwq_determine_ins_pos - find insertion position
+ * @gcwq: gcwq of interest
+ * @cwq: cwq a work is being queued for
+ *
+ * A work for @cwq is about to be queued on @gcwq, determine insertion
+ * position for the work. If @cwq is for HIGHPRI wq, the work is
+ * queued at the head of the queue but in FIFO order with respect to
+ * other HIGHPRI works; otherwise, at the end of the queue. This
+ * function also sets GCWQ_HIGHPRI_PENDING flag to hint @gcwq that
+ * there are HIGHPRI works pending.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ *
+ * RETURNS:
+ * Pointer to inserstion position.
+ */
+static inline struct list_head *gcwq_determine_ins_pos(struct global_cwq *gcwq,
+ struct cpu_workqueue_struct *cwq)
+{
+ struct work_struct *twork;
+
+ if (likely(!(cwq->wq->flags & WQ_HIGHPRI)))
+ return &gcwq->worklist;
+
+ list_for_each_entry(twork, &gcwq->worklist, entry) {
+ struct cpu_workqueue_struct *tcwq = get_work_cwq(twork);
+
+ if (!(tcwq->wq->flags & WQ_HIGHPRI))
+ break;
+ }
+
+ gcwq->flags |= GCWQ_HIGHPRI_PENDING;
+ return &twork->entry;
+}
+
+/**
* insert_work - insert a work into gcwq
* @cwq: cwq @work belongs to
* @work: work to insert
@@ -770,7 +812,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
*/
smp_mb();
- if (!atomic_read(get_gcwq_nr_running(gcwq->cpu)))
+ if (__need_more_worker(gcwq))
wake_up_worker(gcwq);
}
@@ -887,7 +929,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
if (likely(cwq->nr_active < cwq->max_active)) {
cwq->nr_active++;
- worklist = &gcwq->worklist;
+ worklist = gcwq_determine_ins_pos(gcwq, cwq);
} else
worklist = &cwq->delayed_works;
@@ -1526,8 +1568,9 @@ static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
{
struct work_struct *work = list_first_entry(&cwq->delayed_works,
struct work_struct, entry);
+ struct list_head *pos = gcwq_determine_ins_pos(cwq->gcwq, cwq);
- move_linked_works(work, &cwq->gcwq->worklist, NULL);
+ move_linked_works(work, pos, NULL);
cwq->nr_active++;
}
@@ -1634,6 +1677,21 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
set_work_cpu(work, gcwq->cpu);
list_del_init(&work->entry);
+ /*
+ * If HIGHPRI_PENDING, check the next work, and, if HIGHPRI,
+ * wake up another worker; otherwise, clear HIGHPRI_PENDING.
+ */
+ if (unlikely(gcwq->flags & GCWQ_HIGHPRI_PENDING)) {
+ struct work_struct *nwork = list_first_entry(&gcwq->worklist,
+ struct work_struct, entry);
+
+ if (!list_empty(&gcwq->worklist) &&
+ get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI)
+ wake_up_worker(gcwq);
+ else
+ gcwq->flags &= ~GCWQ_HIGHPRI_PENDING;
+ }
+
spin_unlock_irq(&gcwq->lock);
work_clear_pending(work);
Currently, as there's only single worker per cwq, having linked works
doesn't make any visible behavior difference. This change is to
prepare for multiple shared workers per cpu.
Signed-off-by: Tejun Heo <t...@kernel.org>
---
include/linux/workqueue.h | 4 +-
kernel/workqueue.c | 152 ++++++++++++++++++++++++++++++++++++++------
2 files changed, 134 insertions(+), 22 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 8762f62..4f4fdba 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -24,8 +24,9 @@ typedef void (*work_func_t)(struct work_struct *work);
enum {
WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */
+ WORK_STRUCT_LINKED_BIT = 1, /* next work is linked to this one */
#ifdef CONFIG_DEBUG_OBJECTS_WORK
- WORK_STRUCT_STATIC_BIT = 1, /* static initializer (debugobjects) */
+ WORK_STRUCT_STATIC_BIT = 2, /* static initializer (debugobjects) */
WORK_STRUCT_COLOR_SHIFT = 3, /* color for workqueue flushing */
#else
WORK_STRUCT_COLOR_SHIFT = 2, /* color for workqueue flushing */
@@ -34,6 +35,7 @@ enum {
WORK_STRUCT_COLOR_BITS = 4,
WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
+ WORK_STRUCT_LINKED = 1 << WORK_STRUCT_LINKED_BIT,
#ifdef CONFIG_DEBUG_OBJECTS_WORK
WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT,
#else
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 0b0c360..74b399b 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -51,6 +51,7 @@ struct cpu_workqueue_struct;
struct worker {
struct work_struct *current_work; /* L: work being processed */
+ struct list_head scheduled; /* L: scheduled works */
struct task_struct *task; /* I: worker task */
struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
int id; /* I: worker id */
@@ -445,6 +446,8 @@ static struct worker *alloc_worker(void)
struct worker *worker;
worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+ if (worker)
+ INIT_LIST_HEAD(&worker->scheduled);
return worker;
}
@@ -530,6 +533,7 @@ static void destroy_worker(struct worker *worker)
/* sanity check frenzy */
BUG_ON(worker->current_work);
+ BUG_ON(!list_empty(&worker->scheduled));
kthread_stop(worker->task);
kfree(worker);
@@ -540,6 +544,47 @@ static void destroy_worker(struct worker *worker)
}
/**
+ * move_linked_works - move linked works to a list
+ * @work: start of series of works to be scheduled
+ * @head: target list to append @work to
+ * @nextp: out paramter for nested worklist walking
+ *
+ * Schedule linked works starting from @work to @head. Work series to
+ * be scheduled starts at @work and includes any consecutive work with
+ * WORK_STRUCT_LINKED set in its predecessor.
+ *
+ * If @nextp is not NULL, it's updated to point to the next work of
+ * the last scheduled work. This allows move_linked_works() to be
+ * nested inside outer list_for_each_entry_safe().
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void move_linked_works(struct work_struct *work, struct list_head *head,
+ struct work_struct **nextp)
+{
+ struct work_struct *n;
+
+ /*
+ * Linked worklist will always end before the end of the list,
+ * use NULL for list head.
+ */
+ list_for_each_entry_safe_from(work, n, NULL, entry) {
+ list_move_tail(&work->entry, head);
+ if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
+ break;
+ }
+
+ /*
+ * If we're already inside safe list traversal and have moved
+ * multiple works to the scheduled queue, the next position
+ * needs to be updated.
+ */
+ if (nextp)
+ *nextp = n;
+}
+
+/**
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
* @cwq: cwq of interest
* @color: color of work which left the queue
@@ -639,17 +684,25 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
cwq_dec_nr_in_flight(cwq, work_color);
}
-static void run_workqueue(struct worker *worker)
+/**
+ * process_scheduled_works - process scheduled works
+ * @worker: self
+ *
+ * Process all scheduled works. Please note that the scheduled list
+ * may change while processing a work, so this function repeatedly
+ * fetches a work from the top and executes it.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock) which may be released and regrabbed
+ * multiple times.
+ */
+static void process_scheduled_works(struct worker *worker)
{
- struct cpu_workqueue_struct *cwq = worker->cwq;
-
- spin_lock_irq(&cwq->lock);
- while (!list_empty(&cwq->worklist)) {
- struct work_struct *work = list_entry(cwq->worklist.next,
+ while (!list_empty(&worker->scheduled)) {
+ struct work_struct *work = list_first_entry(&worker->scheduled,
struct work_struct, entry);
process_one_work(worker, work);
}
- spin_unlock_irq(&cwq->lock);
}
/**
@@ -684,7 +737,28 @@ static int worker_thread(void *__worker)
get_cpu_mask(cwq->cpu))))
set_cpus_allowed_ptr(worker->task,
get_cpu_mask(cwq->cpu));
- run_workqueue(worker);
+
+ spin_lock_irq(&cwq->lock);
+
+ while (!list_empty(&cwq->worklist)) {
+ struct work_struct *work =
+ list_first_entry(&cwq->worklist,
+ struct work_struct, entry);
+
+ if (likely(!(*work_data_bits(work) &
+ WORK_STRUCT_LINKED))) {
+ /* optimization path, not strictly necessary */
+ process_one_work(worker, work);
+ if (unlikely(!list_empty(&worker->scheduled)))
+ process_scheduled_works(worker);
+ } else {
+ move_linked_works(work, &worker->scheduled,
+ NULL);
+ process_scheduled_works(worker);
+ }
+ }
+
+ spin_unlock_irq(&cwq->lock);
}
return 0;
@@ -705,16 +779,33 @@ static void wq_barrier_func(struct work_struct *work)
* insert_wq_barrier - insert a barrier work
* @cwq: cwq to insert barrier into
* @barr: wq_barrier to insert
- * @head: insertion point
+ * @target: target work to attach @barr to
+ * @worker: worker currently executing @target, NULL if @target is not executing
*
- * Insert barrier @barr into @cwq before @head.
+ * @barr is linked to @target such that @barr is completed only after
+ * @target finishes execution. Please note that the ordering
+ * guarantee is observed only with respect to @target and on the local
+ * cpu.
+ *
+ * Currently, a queued barrier can't be canceled. This is because
+ * try_to_grab_pending() can't determine whether the work to be
+ * grabbed is at the head of the queue and thus can't clear LINKED
+ * flag of the previous work while there must be a valid next work
+ * after a work with LINKED flag set.
+ *
+ * Note that when @worker is non-NULL, @target may be modified
+ * underneath us, so we can't reliably determine cwq from @target.
*
* CONTEXT:
* spin_lock_irq(cwq->lock).
*/
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
- struct wq_barrier *barr, struct list_head *head)
+ struct wq_barrier *barr,
+ struct work_struct *target, struct worker *worker)
{
+ struct list_head *head;
+ unsigned int linked = 0;
+
/*
* debugobject calls are safe here even with cwq->lock locked
* as we know for sure that this will not trigger any of the
@@ -725,8 +816,24 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion(&barr->done);
+ /*
+ * If @target is currently being executed, schedule the
+ * barrier to the worker; otherwise, put it after @target.
+ */
+ if (worker)
+ head = worker->scheduled.next;
+ else {
+ unsigned long *bits = work_data_bits(target);
+
+ head = target->entry.next;
+ /* there can already be other linked works, inherit and set */
+ linked = *bits & WORK_STRUCT_LINKED;
+ __set_bit(WORK_STRUCT_LINKED_BIT, bits);
+ }
+
debug_work_activate(&barr->work);
- insert_work(cwq, &barr->work, head, work_color_to_flags(WORK_NO_COLOR));
+ insert_work(cwq, &barr->work, head,
+ work_color_to_flags(WORK_NO_COLOR) | linked);
}
/**
@@ -964,8 +1071,8 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
*/
int flush_work(struct work_struct *work)
{
+ struct worker *worker = NULL;
struct cpu_workqueue_struct *cwq;
- struct list_head *prev;
struct wq_barrier barr;
might_sleep();
@@ -985,14 +1092,14 @@ int flush_work(struct work_struct *work)
smp_rmb();
if (unlikely(cwq != get_wq_data(work)))
goto already_gone;
- prev = &work->entry;
} else {
- if (!cwq->worker || cwq->worker->current_work != work)
+ if (cwq->worker && cwq->worker->current_work == work)
+ worker = cwq->worker;
+ if (!worker)
goto already_gone;
- prev = &cwq->worklist;
}
- insert_wq_barrier(cwq, &barr, prev->next);
+ insert_wq_barrier(cwq, &barr, work, worker);
spin_unlock_irq(&cwq->lock);
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
@@ -1048,16 +1155,19 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
struct wq_barrier barr;
- int running = 0;
+ struct worker *worker;
spin_lock_irq(&cwq->lock);
+
+ worker = NULL;
if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
- insert_wq_barrier(cwq, &barr, cwq->worklist.next);
- running = 1;
+ worker = cwq->worker;
+ insert_wq_barrier(cwq, &barr, work, worker);
}
+
spin_unlock_irq(&cwq->lock);
- if (unlikely(running)) {
+ if (unlikely(worker)) {
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
cwq->max_active can be specified via the new @max_active parameter to
__create_workqueue() and is set to 1 for all workqueues for now. As
each cwq has only single worker now, this double queueing doesn't
cause any behavior difference visible to its users.
This will be used to reimplement freeze/thaw and implement shared
worker pool.
Signed-off-by: Tejun Heo <t...@kernel.org>
---
include/linux/workqueue.h | 18 +++++++++---------
kernel/workqueue.c | 39 +++++++++++++++++++++++++++++++++++++--
2 files changed, 46 insertions(+), 11 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 4f4fdba..eb753b7 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -225,11 +225,11 @@ enum {
};
extern struct workqueue_struct *
-__create_workqueue_key(const char *name, unsigned int flags,
+__create_workqueue_key(const char *name, unsigned int flags, int max_active,
struct lock_class_key *key, const char *lock_name);
#ifdef CONFIG_LOCKDEP
-#define __create_workqueue(name, flags) \
+#define __create_workqueue(name, flags, max_active) \
({ \
static struct lock_class_key __key; \
const char *__lock_name; \
@@ -239,20 +239,20 @@ __create_workqueue_key(const char *name, unsigned int flags,
else \
__lock_name = #name; \
\
- __create_workqueue_key((name), (flags), &__key, \
- __lock_name); \
+ __create_workqueue_key((name), (flags), (max_active), \
+ &__key, __lock_name); \
})
#else
-#define __create_workqueue(name, flags) \
- __create_workqueue_key((name), (flags), NULL, NULL)
+#define __create_workqueue(name, flags, max_active) \
+ __create_workqueue_key((name), (flags), (max_active), NULL, NULL)
#endif
#define create_workqueue(name) \
- __create_workqueue((name), 0)
+ __create_workqueue((name), 0, 1)
#define create_freezeable_workqueue(name) \
- __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD)
+ __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD, 1)
#define create_singlethread_workqueue(name) \
- __create_workqueue((name), WQ_SINGLE_THREAD)
+ __create_workqueue((name), WQ_SINGLE_THREAD, 1)
extern void destroy_workqueue(struct workqueue_struct *wq);
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 74b399b..101b92e 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -77,6 +77,9 @@ struct cpu_workqueue_struct {
int flush_color; /* L: flushing color */
int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
+ int nr_active; /* L: nr of active works */
+ int max_active; /* I: max active works */
+ struct list_head delayed_works; /* L: delayed works */
};
/*
@@ -321,14 +324,24 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
+ struct list_head *worklist;
unsigned long flags;
debug_work_activate(work);
+
spin_lock_irqsave(&cwq->lock, flags);
BUG_ON(!list_empty(&work->entry));
+
cwq->nr_in_flight[cwq->work_color]++;
- insert_work(cwq, work, &cwq->worklist,
- work_color_to_flags(cwq->work_color));
+
+ if (likely(cwq->nr_active < cwq->max_active)) {
+ cwq->nr_active++;
+ worklist = &cwq->worklist;
+ } else
+ worklist = &cwq->delayed_works;
+
+ insert_work(cwq, work, worklist, work_color_to_flags(cwq->work_color));
+
spin_unlock_irqrestore(&cwq->lock, flags);
}
@@ -584,6 +597,15 @@ static void move_linked_works(struct work_struct *work, struct list_head *head,
*nextp = n;
}
+static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
+{
+ struct work_struct *work = list_first_entry(&cwq->delayed_works,
+ struct work_struct, entry);
+
+ move_linked_works(work, &cwq->worklist, NULL);
+ cwq->nr_active++;
+}
+
/**
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
* @cwq: cwq of interest
@@ -602,6 +624,12 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
return;
cwq->nr_in_flight[color]--;
+ cwq->nr_active--;
+
+ /* one down, submit a delayed one */
+ if (!list_empty(&cwq->delayed_works) &&
+ cwq->nr_active < cwq->max_active)
+ cwq_activate_first_delayed(cwq);
/* is flush in progress and are we at the flushing tip? */
if (likely(cwq->flush_color != color))
@@ -1499,6 +1527,7 @@ static void free_cwqs(struct cpu_workqueue_struct *cwqs)
struct workqueue_struct *__create_workqueue_key(const char *name,
unsigned int flags,
+ int max_active,
struct lock_class_key *key,
const char *lock_name)
{
@@ -1507,6 +1536,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
bool failed = false;
unsigned int cpu;
+ max_active = clamp_val(max_active, 1, INT_MAX);
+
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
goto err;
@@ -1538,8 +1569,10 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
cwq->cpu = cpu;
cwq->wq = wq;
cwq->flush_color = -1;
+ cwq->max_active = max_active;
spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist);
+ INIT_LIST_HEAD(&cwq->delayed_works);
init_waitqueue_head(&cwq->more_work);
if (failed)
@@ -1601,6 +1634,8 @@ void destroy_workqueue(struct workqueue_struct *wq)
for (i = 0; i < WORK_NR_COLORS; i++)
BUG_ON(cwq->nr_in_flight[i]);
+ BUG_ON(cwq->nr_active);
+ BUG_ON(!list_empty(&cwq->delayed_works));
}
free_cwqs(wq->cpu_wq);
Reimplement st workqueue using dynamic single cpu binding and
cwq->limit. WQ_SINGLE_THREAD is replaced with WQ_SINGLE_CPU. In a
single cpu workqueue, at most single cwq is bound to the wq at any
given time. Arbitration is done using atomic accesses to
wq->single_cpu when queueing a work. Once bound, the binding stays
till the workqueue is drained.
Note that the binding is never broken while a workqueue is frozen.
This is because idle cwqs may have works waiting in delayed_works
queue while frozen. On thaw, the cwq is restarted if there are any
delayed works or unbound otherwise.
When combined with max_active limit of 1, single cpu workqueue has
exactly the same execution properties as the original single thread
workqueue while allowing sharing of per-cpu workers.
Signed-off-by: Tejun Heo <t...@kernel.org>
---
include/linux/workqueue.h | 6 +-
kernel/workqueue.c | 135 +++++++++++++++++++++++++++++++++------------
2 files changed, 103 insertions(+), 38 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index ab0b7fb..10611f7 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -221,7 +221,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
enum {
WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */
- WQ_SINGLE_THREAD = 1 << 1, /* no per-cpu worker */
+ WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
};
extern struct workqueue_struct *
@@ -250,9 +250,9 @@ __create_workqueue_key(const char *name, unsigned int flags, int max_active,
#define create_workqueue(name) \
__create_workqueue((name), 0, 1)
#define create_freezeable_workqueue(name) \
- __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD, 1)
+ __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_CPU, 1)
#define create_singlethread_workqueue(name) \
- __create_workqueue((name), WQ_SINGLE_THREAD, 1)
+ __create_workqueue((name), WQ_SINGLE_CPU, 1)
extern void destroy_workqueue(struct workqueue_struct *wq);
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 5cd155d..2ce895e 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -114,8 +114,7 @@ struct global_cwq {
} ____cacheline_aligned_in_smp;
/*
- * The per-CPU workqueue (if single thread, we always use the first
- * possible cpu). The lower WORK_STRUCT_FLAG_BITS of
+ * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of
* work_struct->data are used for flags and thus cwqs need to be
* aligned at two's power of the number of flag bits.
*/
@@ -159,6 +158,8 @@ struct workqueue_struct {
struct list_head flusher_queue; /* F: flush waiters */
struct list_head flusher_overflow; /* F: flush overflow list */
+ unsigned long single_cpu; /* cpu for single cpu wq */
+
int saved_max_active; /* I: saved cwq max_active */
const char *name; /* I: workqueue name */
#ifdef CONFIG_LOCKDEP
@@ -289,8 +290,6 @@ static DEFINE_PER_CPU(struct global_cwq, global_cwq);
static int worker_thread(void *__worker);
-static int singlethread_cpu __read_mostly;
-
static struct global_cwq *get_gcwq(unsigned int cpu)
{
return &per_cpu(global_cwq, cpu);
@@ -302,14 +301,6 @@ static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
return per_cpu_ptr(wq->cpu_wq, cpu);
}
-static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
- struct workqueue_struct *wq)
-{
- if (unlikely(wq->flags & WQ_SINGLE_THREAD))
- cpu = singlethread_cpu;
- return get_cwq(cpu, wq);
-}
-
static unsigned int work_color_to_flags(int color)
{
return color << WORK_STRUCT_COLOR_SHIFT;
@@ -410,17 +401,87 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
wake_up_process(cwq->worker->task);
}
+/**
+ * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
+ * @cwq: cwq to unbind
+ *
+ * Try to unbind @cwq from single cpu workqueue processing. If
+ * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
+{
+ struct workqueue_struct *wq = cwq->wq;
+ struct global_cwq *gcwq = cwq->gcwq;
+
+ BUG_ON(wq->single_cpu != gcwq->cpu);
+ /*
+ * Unbind from workqueue if @cwq is not frozen. If frozen,
+ * thaw_workqueues() will either restart processing on this
+ * cpu or unbind if empty. This keeps works queued while
+ * frozen fully ordered and flushable.
+ */
+ if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
+ smp_wmb(); /* paired with cmpxchg() in __queue_work() */
+ wq->single_cpu = NR_CPUS;
+ }
+}
+
static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
- struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
- struct global_cwq *gcwq = cwq->gcwq;
+ struct global_cwq *gcwq;
+ struct cpu_workqueue_struct *cwq;
struct list_head *worklist;
unsigned long flags;
+ bool arbitrate;
debug_work_activate(work);
- spin_lock_irqsave(&gcwq->lock, flags);
+ /* determine gcwq to use */
+ if (!(wq->flags & WQ_SINGLE_CPU)) {
+ /* just use the requested cpu for multicpu workqueues */
+ gcwq = get_gcwq(cpu);
+ spin_lock_irqsave(&gcwq->lock, flags);
+ } else {
+ unsigned int req_cpu = cpu;
+
+ /*
+ * It's a bit more complex for single cpu workqueues.
+ * We first need to determine which cpu is going to be
+ * used. If no cpu is currently serving this
+ * workqueue, arbitrate using atomic accesses to
+ * wq->single_cpu; otherwise, use the current one.
+ */
+ retry:
+ cpu = wq->single_cpu;
+ arbitrate = cpu == NR_CPUS;
+ if (arbitrate)
+ cpu = req_cpu;
+
+ gcwq = get_gcwq(cpu);
+ spin_lock_irqsave(&gcwq->lock, flags);
+
+ /*
+ * The following cmpxchg() is a full barrier paired
+ * with smp_wmb() in cwq_unbind_single_cpu() and
+ * guarantees that all changes to wq->st_* fields are
+ * visible on the new cpu after this point.
+ */
+ if (arbitrate)
+ cmpxchg(&wq->single_cpu, NR_CPUS, cpu);
+
+ if (unlikely(wq->single_cpu != cpu)) {
+ spin_unlock_irqrestore(&gcwq->lock, flags);
+ goto retry;
+ }
+ }
+
+ /* gcwq determined, get cwq and queue */
+ cwq = get_cwq(gcwq->cpu, wq);
+
BUG_ON(!list_empty(&work->entry));
cwq->nr_in_flight[cwq->work_color]++;
@@ -530,7 +591,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
timer_stats_timer_set_start_info(&dwork->timer);
/* This stores cwq for the moment, for the timer_fn */
- set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
+ set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
@@ -790,10 +851,14 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
cwq->nr_in_flight[color]--;
cwq->nr_active--;
- /* one down, submit a delayed one */
- if (!list_empty(&cwq->delayed_works) &&
- cwq->nr_active < cwq->max_active)
- cwq_activate_first_delayed(cwq);
+ if (!list_empty(&cwq->delayed_works)) {
+ /* one down, submit a delayed one */
+ if (cwq->nr_active < cwq->max_active)
+ cwq_activate_first_delayed(cwq);
+ } else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
+ /* this was the last work, unbind from single cpu */
+ cwq_unbind_single_cpu(cwq);
+ }
/* is flush in progress and are we at the flushing tip? */
if (likely(cwq->flush_color != color))
@@ -1721,7 +1786,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
struct lock_class_key *key,
const char *lock_name)
{
- bool singlethread = flags & WQ_SINGLE_THREAD;
struct workqueue_struct *wq;
bool failed = false;
unsigned int cpu;
@@ -1742,6 +1806,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
atomic_set(&wq->nr_cwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
+ wq->single_cpu = NR_CPUS;
+
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
@@ -1767,8 +1833,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
if (failed)
continue;
- cwq->worker = create_worker(cwq,
- cpu_online(cpu) && !singlethread);
+ cwq->worker = create_worker(cwq, cpu_online(cpu));
if (cwq->worker)
start_worker(cwq->worker);
else
@@ -1952,18 +2017,16 @@ static int __cpuinit trustee_thread(void *__gcwq)
spin_lock_irq(&gcwq->lock);
/*
- * Make all multithread workers rogue. Trustee must be bound
- * to the target cpu and can't be cancelled.
+ * Make all workers rogue. Trustee must be bound to the
+ * target cpu and can't be cancelled.
*/
BUG_ON(gcwq->cpu != smp_processor_id());
list_for_each_entry(worker, &gcwq->idle_list, entry)
- if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
- worker->flags |= WORKER_ROGUE;
+ worker->flags |= WORKER_ROGUE;
for_each_busy_worker(worker, i, pos, gcwq)
- if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
- worker->flags |= WORKER_ROGUE;
+ worker->flags |= WORKER_ROGUE;
/*
* We're now in charge. Notify and proceed to drain. We need
@@ -2068,14 +2131,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
wait_trustee_state(gcwq, TRUSTEE_DONE);
}
- /* clear ROGUE from all multithread workers */
+ /* clear ROGUE from all workers */
list_for_each_entry(worker, &gcwq->idle_list, entry)
- if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
- worker->flags &= ~WORKER_ROGUE;
+ worker->flags &= ~WORKER_ROGUE;
for_each_busy_worker(worker, i, pos, gcwq)
- if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
- worker->flags &= ~WORKER_ROGUE;
+ worker->flags &= ~WORKER_ROGUE;
break;
}
@@ -2260,6 +2321,11 @@ void thaw_workqueues(void)
cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq);
+ /* perform delayed unbind from single cpu if empty */
+ if (wq->single_cpu == gcwq->cpu &&
+ !cwq->nr_active && list_empty(&cwq->delayed_works))
+ cwq_unbind_single_cpu(cwq);
+
wake_up_process(cwq->worker->task);
}
@@ -2285,7 +2351,6 @@ void __init init_workqueues(void)
BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
__alignof__(unsigned long long));
- singlethread_cpu = cpumask_first(cpu_possible_mask);
hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
/* initialize gcwqs */
Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Arjan van de Ven <ar...@infradead.org>
---
kernel/async.c | 140 ++++++++-----------------------------------------------
1 files changed, 21 insertions(+), 119 deletions(-)
diff --git a/kernel/async.c b/kernel/async.c
index 15319d6..c285258 100644
--- a/kernel/async.c
+++ b/kernel/async.c
@@ -49,40 +49,32 @@ asynchronous and synchronous parts of the kernel.
*/
#include <linux/async.h>
-#include <linux/bug.h>
#include <linux/module.h>
#include <linux/wait.h>
#include <linux/sched.h>
-#include <linux/init.h>
-#include <linux/kthread.h>
-#include <linux/delay.h>
#include <linux/slab.h>
#include <asm/atomic.h>
static async_cookie_t next_cookie = 1;
-#define MAX_THREADS 256
#define MAX_WORK 32768
static LIST_HEAD(async_pending);
static LIST_HEAD(async_running);
static DEFINE_SPINLOCK(async_lock);
-static int async_enabled = 0;
-
struct async_entry {
- struct list_head list;
- async_cookie_t cookie;
- async_func_ptr *func;
- void *data;
- struct list_head *running;
+ struct list_head list;
+ struct work_struct work;
+ async_cookie_t cookie;
+ async_func_ptr *func;
+ void *data;
+ struct list_head *running;
};
static DECLARE_WAIT_QUEUE_HEAD(async_done);
-static DECLARE_WAIT_QUEUE_HEAD(async_new);
static atomic_t entry_count;
-static atomic_t thread_count;
extern int initcall_debug;
@@ -117,27 +109,23 @@ static async_cookie_t lowest_in_progress(struct list_head *running)
spin_unlock_irqrestore(&async_lock, flags);
return ret;
}
+
/*
* pick the first pending entry and run it
*/
-static void run_one_entry(void)
+static void async_run_entry_fn(struct work_struct *work)
{
+ struct async_entry *entry =
+ container_of(work, struct async_entry, work);
unsigned long flags;
- struct async_entry *entry;
ktime_t calltime, delta, rettime;
- /* 1) pick one task from the pending queue */
-
+ /* 1) move self to the running queue */
spin_lock_irqsave(&async_lock, flags);
- if (list_empty(&async_pending))
- goto out;
- entry = list_first_entry(&async_pending, struct async_entry, list);
-
- /* 2) move it to the running queue */
list_move_tail(&entry->list, entry->running);
spin_unlock_irqrestore(&async_lock, flags);
- /* 3) run it (and print duration)*/
+ /* 2) run (and print duration) */
if (initcall_debug && system_state == SYSTEM_BOOTING) {
printk("calling %lli_%pF @ %i\n", (long long)entry->cookie,
entry->func, task_pid_nr(current));
@@ -153,31 +141,25 @@ static void run_one_entry(void)
(long long)ktime_to_ns(delta) >> 10);
}
- /* 4) remove it from the running queue */
+ /* 3) remove self from the running queue */
spin_lock_irqsave(&async_lock, flags);
list_del(&entry->list);
- /* 5) free the entry */
+ /* 4) free the entry */
kfree(entry);
atomic_dec(&entry_count);
spin_unlock_irqrestore(&async_lock, flags);
- /* 6) wake up any waiters. */
+ /* 5) wake up any waiters */
wake_up(&async_done);
- return;
-
-out:
- spin_unlock_irqrestore(&async_lock, flags);
}
-
static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct list_head *running)
{
struct async_entry *entry;
unsigned long flags;
async_cookie_t newcookie;
-
/* allow irq-off callers */
entry = kzalloc(sizeof(struct async_entry), GFP_ATOMIC);
@@ -186,7 +168,7 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l
* If we're out of memory or if there's too much work
* pending already, we execute synchronously.
*/
- if (!async_enabled || !entry || atomic_read(&entry_count) > MAX_WORK) {
+ if (!entry || atomic_read(&entry_count) > MAX_WORK) {
kfree(entry);
spin_lock_irqsave(&async_lock, flags);
newcookie = next_cookie++;
@@ -196,6 +178,7 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l
ptr(data, newcookie);
return newcookie;
}
+ INIT_WORK(&entry->work, async_run_entry_fn);
entry->func = ptr;
entry->data = data;
entry->running = running;
@@ -205,7 +188,10 @@ static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct l
list_add_tail(&entry->list, &async_pending);
atomic_inc(&entry_count);
spin_unlock_irqrestore(&async_lock, flags);
- wake_up(&async_new);
+
+ /* schedule for execution */
+ queue_work(system_long_wq, &entry->work);
+
return newcookie;
}
@@ -312,87 +298,3 @@ void async_synchronize_cookie(async_cookie_t cookie)
async_synchronize_cookie_domain(cookie, &async_running);
}
EXPORT_SYMBOL_GPL(async_synchronize_cookie);
-
-
-static int async_thread(void *unused)
-{
- DECLARE_WAITQUEUE(wq, current);
- add_wait_queue(&async_new, &wq);
-
- while (!kthread_should_stop()) {
- int ret = HZ;
- set_current_state(TASK_INTERRUPTIBLE);
- /*
- * check the list head without lock.. false positives
- * are dealt with inside run_one_entry() while holding
- * the lock.
- */
- rmb();
- if (!list_empty(&async_pending))
- run_one_entry();
- else
- ret = schedule_timeout(HZ);
-
- if (ret == 0) {
- /*
- * we timed out, this means we as thread are redundant.
- * we sign off and die, but we to avoid any races there
- * is a last-straw check to see if work snuck in.
- */
- atomic_dec(&thread_count);
- wmb(); /* manager must see our departure first */
- if (list_empty(&async_pending))
- break;
- /*
- * woops work came in between us timing out and us
- * signing off; we need to stay alive and keep working.
- */
- atomic_inc(&thread_count);
- }
- }
- remove_wait_queue(&async_new, &wq);
-
- return 0;
-}
-
-static int async_manager_thread(void *unused)
-{
- DECLARE_WAITQUEUE(wq, current);
- add_wait_queue(&async_new, &wq);
-
- while (!kthread_should_stop()) {
- int tc, ec;
-
- set_current_state(TASK_INTERRUPTIBLE);
-
- tc = atomic_read(&thread_count);
- rmb();
- ec = atomic_read(&entry_count);
-
- while (tc < ec && tc < MAX_THREADS) {
- if (IS_ERR(kthread_run(async_thread, NULL, "async/%i",
- tc))) {
- msleep(100);
- continue;
- }
- atomic_inc(&thread_count);
- tc++;
- }
-
- schedule();
- }
- remove_wait_queue(&async_new, &wq);
-
- return 0;
-}
-
-static int __init async_init(void)
-{
- async_enabled =
- !IS_ERR(kthread_run(async_manager_thread, NULL, "async/mgr"));
-
- WARN_ON(!async_enabled);
- return 0;
-}
-
-core_initcall(async_init);
It appeared to me that async is deemed to parallelize as much as
possible, to probe devices faster on boot for example, while cmwq
seems to do the opposite: trying to execute in batches as much as
possible, and fork when a work goes to sleep voluntarily.
That said I haven't checked that deeply so it's fairly possible
I missed something obvious :)
On 06/29/2010 12:55 AM, Frederic Weisbecker wrote:
> On Mon, Jun 28, 2010 at 11:04:22PM +0200, Tejun Heo wrote:
>> Replace private worker pool with system_long_wq.
>
> It appeared to me that async is deemed to parallelize as much as
> possible, to probe devices faster on boot for example, while cmwq
> seems to do the opposite: trying to execute in batches as much as
> possible, and fork when a work goes to sleep voluntarily.
Yeah, well, that's kind of the whole point of cmwq. It would try to
minimize the number of used workers but the provided concurrency will
still be enough. No async probe will be stalled due to lack of
execution context and the timings should be about the same between the
original async implemetnation and cmwq based one.
Thanks.
--
tejun
Right. I just don't know what is supposed to be slow on boot that needs
to use async. Is that because reading some ports is slow or because we
need to do something and wait for some times to get the result.
If there is a question of slow ports to probe, then cmwq wouldn't seem the
right thing here, as it only forks when we go to sleep.
On 06/29/2010 02:18 PM, Frederic Weisbecker wrote:
>> Yeah, well, that's kind of the whole point of cmwq. It would try to
>> minimize the number of used workers but the provided concurrency will
>> still be enough. No async probe will be stalled due to lack of
>> execution context and the timings should be about the same between the
>> original async implemetnation and cmwq based one.
>
> Right. I just don't know what is supposed to be slow on boot that
> needs to use async. Is that because reading some ports is slow or
> because we need to do something and wait for some times to get the
> result.
It's things like ATA bus resetting and probing. They're usually
composed of short CPU activities and rather long sleeps.
> If there is a question of slow ports to probe, then cmwq wouldn't seem the
> right thing here, as it only forks when we go to sleep.
I lost you here. If something during boot has to burn cpu cycles
(which it shouldn't, really), it has to burn cpu cycles and having
multiple concurent threads won't help anything. If something doesn't
burn cpu cycles but takes long, it gotta sleep and cmwq will start a
new thread immediately. So, can you please elaborate why cmwq would
be problematic?
Thanks.
--
tejun
Ok.
> > If there is a question of slow ports to probe, then cmwq wouldn't seem the
> > right thing here, as it only forks when we go to sleep.
>
> I lost you here. If something during boot has to burn cpu cycles
> (which it shouldn't, really), it has to burn cpu cycles and having
> multiple concurent threads won't help anything.
It would on SMP.
> If something doesn't
> burn cpu cycles but takes long, it gotta sleep and cmwq will start a
> new thread immediately. So, can you please elaborate why cmwq would
> be problematic?
No in this case it's not problematic, as far as the things that were using
async have a small cpu burn and long sleep waiting, it looks like cmwq
fits :)
On 06/29/2010 05:52 PM, Frederic Weisbecker wrote:
>>> If there is a question of slow ports to probe, then cmwq wouldn't seem the
>>> right thing here, as it only forks when we go to sleep.
>>
>> I lost you here. If something during boot has to burn cpu cycles
>> (which it shouldn't, really), it has to burn cpu cycles and having
>> multiple concurent threads won't help anything.
>
> It would on SMP.
Oh, I see. Parallel cpu hogs. We don't have such users for async and
I think using padata would be the right solution for those situations.
Thanks.
--
tejun
uh? clearly the assumption is that if I have a 16 CPU machine, and 12
items of work get scheduled,
that we get all 12 running in parallel. All the smarts of cmwq surely
only kick in once you've reached the
"one work item per cpu" threshold ???
On 06/29/2010 06:40 PM, Arjan van de Ven wrote:
> uh? clearly the assumption is that if I have a 16 CPU machine, and 12
> items of work get scheduled,
> that we get all 12 running in parallel. All the smarts of cmwq surely
> only kick in once you've reached the
> "one work item per cpu" threshold ???
Hmmm... workqueue workers are bound to certain cpu, so if you schedule
a work on a specific CPU, it will run there. Once a cpu gets
saturated, the issuing thread will be moved elsewhere. I don't think
it matters to any of the current async users one way or the other,
would it?
Thanks.
--
tejun
Thinking more about it. It's now not difficult to add a gcwq for an
unbound pseudo CPU number and use it as host for workers which can run
on any CPU. The automatic concurrency management doesn't make much
sense for those workers, so @max_active can be used as the explicit
concurrency throttle. It's not even gonna take a lot of code but I'm
just not convinced that there's much benefit in doing that. So, yeah,
if necessary, sure, but let's think whether it's gonna be actually
useful.
we might be talking past eachother. ;-)
let me define an example that is simple so that we can get on the same page
assume a system with "enough" cpus, say 32.
lets say we have 2 async tasks, that each do an mdelay(1000); (yes I
know stupid, but exagerating things makes things easier to talk about)
lets also assume that they get scheduled right back to back from the
same code on the same cpu.
will the end result be that the first mdelay() task complete before the
second one gets started, or will the end result be that
the 2nd one will notice the first cpu is busy, and find a second cpu to
run in parallel with.
the point in general is to get maximum parallelism; with systems getting
more and more cores, maximum parallelism is
a good design goal.
On 06/29/2010 08:07 PM, Arjan van de Ven wrote:
> we might be talking past eachother. ;-)
>
> let me define an example that is simple so that we can get on the same page
>
> assume a system with "enough" cpus, say 32.
> lets say we have 2 async tasks, that each do an mdelay(1000); (yes I
> know stupid, but exagerating things makes things easier to talk about)
That's the main point to discuss tho. If you exaggerate the use case
out of proportion, you'll end up with something which in the end is
useful only in the imagination and we'll be doing things just because
we can. Creating full number of unbound threads might look like a
good idea to extract maximum cpu parallelism if you exaggerate the use
case like the above but with the current actual use case, it's not
gonna buy us anything and might even cost us more via unnecessary
thread creations.
So, let's talk about whether it's _actually_ useful for the current
use cases. If so, sure, let's do it that way. If not, there is no
reason to go there, right?
Thanks.
--
tejun
I'm not trying to suggest "unbound".
I'm trying to suggest "don't start bounding until you hit # threads >= #
cpus
you have some clever tricks to deal with bounding things; but lets make
sure that the simple case
of having less work to run in parallel than the number of cpus gets
dealt with simple and unbound.
You also consolidate the thread pools so that you have one global pool,
so unlike the current situation
where you get O(Nr pools * Nr cpus), you only get O(Nr cpus) number of
threads... that's not too burdensome imo.
If you want to go below that then I think you're going too far in
reducing the number of threads in your pool. Really.
so... back to my question; will those two tasks run in parallel or
sequential ?
On 06/29/2010 08:22 PM, Arjan van de Ven wrote:
> I'm not trying to suggest "unbound". I'm trying to suggest "don't
> start bounding until you hit # threads >= # cpus you have some
> clever tricks to deal with bounding things; but lets make sure that
> the simple case of having less work to run in parallel than the
> number of cpus gets dealt with simple and unbound.
Well, the thing is, for most cases, binding to cpus is simply better.
That's the reason why our default workqueue was per-cpu to begin with.
There just are a lot more opportunities for optimization for both
memory access and synchronization overheads.
> You also consolidate the thread pools so that you have one global
> pool, so unlike the current situation where you get O(Nr pools * Nr
> cpus), you only get O(Nr cpus) number of threads... that's not too
> burdensome imo. If you want to go below that then I think you're
> going too far in reducing the number of threads in your
> pool. Really.
I lost you in the above paragraph, but I think it would be better to
keep kthread pools separate. It behaves much better regarding memory
access locality (work issuer and worker are on the same cpu and stack
and other memory used by worker are likely to be already hot). Also,
we don't do it yet, but when creating kthreads we can allocate the
stack considering NUMA too.
> so... back to my question; will those two tasks run in parallel or
> sequential ?
If they are scheduled on the same cpu, they won't. If that's
something actually necessary, let's implement it. I have no problem
with that. cmwq already can serve as simple execution context
provider without concurrency control and pumping contexts to async
isn't hard at all. I just wanna know whether it's something which is
actually useful. So, where would that be useful?
Thanks.
--
tejun
depends on the user.
For "throw over the wall" work, this is unclear.
Especially in the light of hyperthreading (sharing L1 cache) or even
modern cpus (where many cores share a fast L3 cache).
I'm fine with a solution that has the caller say 'run anywhere' vs 'try
to run local'.
I suspect there will be many many cases of 'run anywhere'.isn't hard at
all. I just wanna know whether it's something which is
> actually useful. So, where would that be useful?
>
I think it's useful for all users of your worker pool, not (just) async.
it's a severe limitation of the current linux infrastructure, and your
infrastructure has the chance to fix this...
On 06/29/2010 08:41 PM, Arjan van de Ven wrote:
>> Well, the thing is, for most cases, binding to cpus is simply better.
>
> depends on the user.
Heh, yeah, sure, can't disagree with that. :-)
> For "throw over the wall" work, this is unclear. Especially in the
> light of hyperthreading (sharing L1 cache) or even modern cpus
> (where many cores share a fast L3 cache).
There will be many variants of memory configurations and the only way
the generic kernel can optimize memory access is if it localizes stuff
per cpu which is visible to the operating system. That's the lowest
common denominator. From there, we sure can add considerations for
specific shared configurations but I don't think that will be too
common outside of scheduler and maybe memory allocator. It just
becomes too specific to apply to generic kernel core code.
> I'm fine with a solution that has the caller say 'run anywhere' vs
> 'try to run local'. I suspect there will be many many cases of 'run
> anywhere'.isn't hard at all. I just wanna know whether it's
> something which is
Yeah, sure. I can almost view the code in my head right now. If I'm
not completely mistaken, it should be really easy. When a cpu goes
down, all the left works are already executed unbound, so all the
necessary components are already there.
The thing is that once it's not bound to a cpu, where, how and when a
worker runs is best regulated by the scheduler. That's why I kept
talking about wq being simple context provider.
If something is not CPU intensive, CPU parallelism doesn't buy much,
so works which would benefit from parallel execution are likely to be
CPU intensive ones. For CPU intensive tasks, fairness, priority and
all that stuff are pretty important and that's scheduler's job. cmwq
can provide contexts and put some safety limitations but most are best
left to the scheduler.
>> actually useful. So, where would that be useful?
>
> I think it's useful for all users of your worker pool, not (just) async.
>
> it's a severe limitation of the current linux infrastructure, and your
> infrastructure has the chance to fix this...
Yeah, there could be situations where having a generic context
provider can be useful. I'm just not sure async falls in that
category. For the current users, I think we would be (marginally)
better off with bound workers. So, that's the reluctance I have about
updating async conversion.
Thanks.
--
tejun
> Hmmm... workqueue workers are bound to certain cpu, so if you schedule
> a work on a specific CPU, it will run there.
That's my main problem with using cmwq to replace slow-work.
> Once a cpu gets saturated, the issuing thread will be moved elsewhere.
Assuming that the issuing thread isn't bound by the condition specified in the
previous sentence...
David
* It uses a dedicated gcwq with a pseudo CPU number WORK_CPU_UNBOUND.
This gcwq is always online and disassociated.
* Workers are not bound to any CPU and not concurrency managed. Works
are dispatched to workers as soon as possible and the only applied
limitation is @max_active. IOW, all unbound workqeueues are
implicitly high priority.
Unbound workqueues can be used as simple execution context provider.
Contexts unbound to any cpu are served as soon as possible.
Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Arjan van de Ven <ar...@linux.intel.com>
Cc: David Howells <dhow...@redhat.com>
---
include/linux/workqueue.h | 15 +++-
kernel/workqueue.c | 218 +++++++++++++++++++++++++++++++++------------
2 files changed, 173 insertions(+), 60 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 139069a..67ce734 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -51,7 +51,8 @@ enum {
WORK_NO_COLOR = WORK_NR_COLORS,
/* special cpu IDs */
- WORK_CPU_NONE = NR_CPUS,
+ WORK_CPU_UNBOUND = NR_CPUS,
+ WORK_CPU_NONE = NR_CPUS + 1,
WORK_CPU_LAST = WORK_CPU_NONE,
/*
@@ -237,11 +238,17 @@ enum {
WQ_RESCUER = 1 << 3, /* has an rescue worker */
WQ_HIGHPRI = 1 << 4, /* high priority */
WQ_CPU_INTENSIVE = 1 << 5, /* cpu instensive workqueue */
+ WQ_UNBOUND = 1 << 6, /* not bound to any cpu */
WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */
+ WQ_MAX_UNBOUND_PER_CPU = 4, /* 4 * #cpus for unbound wq */
WQ_DFL_ACTIVE = WQ_MAX_ACTIVE / 2,
};
+/* unbound wq's aren't per-cpu, scale max_active according to #cpus */
+#define WQ_UNBOUND_MAX_ACTIVE \
+ max_t(int, WQ_MAX_ACTIVE, num_possible_cpus() * WQ_MAX_UNBOUND_PER_CPU)
+
/*
* System-wide workqueues which are always present.
*
@@ -256,10 +263,16 @@ enum {
* system_nrt_wq is non-reentrant and guarantees that any given work
* item is never executed in parallel by multiple CPUs. Queue
* flushing might take relatively long.
+ *
+ * system_unbound_wq is unbound workqueue. Workers are not bound to
+ * any specific CPU, not concurrency managed, and all queued works are
+ * executed immediately as long as max_active limit is not reached and
+ * resources are available.
*/
extern struct workqueue_struct *system_wq;
extern struct workqueue_struct *system_long_wq;
extern struct workqueue_struct *system_nrt_wq;
+extern struct workqueue_struct *system_unbound_wq;
extern struct workqueue_struct *
__alloc_workqueue_key(const char *name, unsigned int flags, int max_active,
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index a105ddf..4608563 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -53,9 +53,10 @@ enum {
WORKER_ROGUE = 1 << 4, /* not bound to any cpu */
WORKER_REBIND = 1 << 5, /* mom is home, come back */
WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
+ WORKER_UNBOUND = 1 << 7, /* worker is unbound */
WORKER_NOT_RUNNING = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND |
- WORKER_CPU_INTENSIVE,
+ WORKER_CPU_INTENSIVE | WORKER_UNBOUND,
/* gcwq->trustee_state */
TRUSTEE_START = 0, /* start */
@@ -96,7 +97,7 @@ enum {
* X: During normal operation, modification requires gcwq->lock and
* should be done only from local cpu. Either disabling preemption
* on local cpu or grabbing gcwq->lock is enough for read access.
- * While trustee is in charge, it's identical to L.
+ * If GCWQ_DISASSOCIATED is set, it's identical to L.
*
* F: wq->flush_mutex protected.
*
@@ -220,14 +221,52 @@ struct workqueue_struct {
struct workqueue_struct *system_wq __read_mostly;
struct workqueue_struct *system_long_wq __read_mostly;
struct workqueue_struct *system_nrt_wq __read_mostly;
+struct workqueue_struct *system_unbound_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_wq);
EXPORT_SYMBOL_GPL(system_long_wq);
EXPORT_SYMBOL_GPL(system_nrt_wq);
+EXPORT_SYMBOL_GPL(system_unbound_wq);
#define for_each_busy_worker(worker, i, pos, gcwq) \
for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \
hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
+static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask,
+ unsigned int sw)
+{
+ if (cpu < nr_cpu_ids) {
+ if (sw & 1) {
+ cpu = cpumask_next(cpu, mask);
+ if (cpu < nr_cpu_ids)
+ return cpu;
+ }
+ if (sw & 2)
+ return WORK_CPU_UNBOUND;
+ }
+ return WORK_CPU_NONE;
+}
+
+static inline int __next_wq_cpu(int cpu, const struct cpumask *mask,
+ struct workqueue_struct *wq)
+{
+ return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2);
+}
+
+#define for_each_gcwq_cpu(cpu) \
+ for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3); \
+ (cpu) < WORK_CPU_NONE; \
+ (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 3))
+
+#define for_each_online_gcwq_cpu(cpu) \
+ for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 3); \
+ (cpu) < WORK_CPU_NONE; \
+ (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 3))
+
+#define for_each_cwq_cpu(cpu, wq) \
+ for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, (wq)); \
+ (cpu) < WORK_CPU_NONE; \
+ (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq)))
+
#ifdef CONFIG_DEBUG_OBJECTS_WORK
static struct debug_obj_descr work_debug_descr;
@@ -351,26 +390,46 @@ static bool workqueue_freezing; /* W: have wqs started freezing? */
static DEFINE_PER_CPU(struct global_cwq, global_cwq);
static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
+/*
+ * Global cpu workqueue and nr_running counter for unbound gcwq. The
+ * gcwq is always online, has GCWQ_DISASSOCIATED set, and all its
+ * workers have WORKER_UNBOUND set.
+ */
+static struct global_cwq unbound_global_cwq;
+static atomic_t unbound_gcwq_nr_running = ATOMIC_INIT(0); /* always 0 */
+
static int worker_thread(void *__worker);
static struct global_cwq *get_gcwq(unsigned int cpu)
{
- return &per_cpu(global_cwq, cpu);
+ if (cpu != WORK_CPU_UNBOUND)
+ return &per_cpu(global_cwq, cpu);
+ else
+ return &unbound_global_cwq;
}
static atomic_t *get_gcwq_nr_running(unsigned int cpu)
{
- return &per_cpu(gcwq_nr_running, cpu);
+ if (cpu != WORK_CPU_UNBOUND)
+ return &per_cpu(gcwq_nr_running, cpu);
+ else
+ return &unbound_gcwq_nr_running;
}
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
struct workqueue_struct *wq)
{
-#ifndef CONFIG_SMP
- return wq->cpu_wq.single;
+ if (!(wq->flags & WQ_UNBOUND)) {
+ if (likely(cpu < nr_cpu_ids)) {
+#ifdef CONFIG_SMP
+ return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
#else
- return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
+ return wq->cpu_wq.single;
#endif
+ }
+ } else if (likely(cpu == WORK_CPU_UNBOUND))
+ return wq->cpu_wq.single;
+ return NULL;
}
static unsigned int work_color_to_flags(int color)
@@ -453,7 +512,7 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
if (cpu == WORK_CPU_NONE)
return NULL;
- BUG_ON(cpu >= nr_cpu_ids);
+ BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND);
return get_gcwq(cpu);
}
@@ -869,11 +928,14 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
debug_work_activate(work);
+ if (unlikely(cpu == WORK_CPU_UNBOUND))
+ cpu = raw_smp_processor_id();
+
/*
* Determine gcwq to use. SINGLE_CPU is inherently
* NON_REENTRANT, so test it first.
*/
- if (!(wq->flags & WQ_SINGLE_CPU)) {
+ if (!(wq->flags & (WQ_SINGLE_CPU | WQ_UNBOUND))) {
struct global_cwq *last_gcwq;
/*
@@ -900,7 +962,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
}
} else
spin_lock_irqsave(&gcwq->lock, flags);
- } else {
+ } else if (!(wq->flags & WQ_UNBOUND)) {
unsigned int req_cpu = cpu;
/*
@@ -932,6 +994,9 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
spin_unlock_irqrestore(&gcwq->lock, flags);
goto retry;
}
+ } else {
+ gcwq = get_gcwq(WORK_CPU_UNBOUND);
+ spin_lock_irqsave(&gcwq->lock, flags);
}
/* gcwq determined, get cwq and queue */
@@ -1166,7 +1231,8 @@ static bool worker_maybe_bind_and_lock(struct worker *worker)
* it races with cpu hotunplug operation. Verify
* against GCWQ_DISASSOCIATED.
*/
- set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
+ if (!(gcwq->flags & GCWQ_DISASSOCIATED))
+ set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
spin_lock_irq(&gcwq->lock);
if (gcwq->flags & GCWQ_DISASSOCIATED)
@@ -1231,8 +1297,9 @@ static struct worker *alloc_worker(void)
*/
static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
{
- int id = -1;
+ bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND;
struct worker *worker = NULL;
+ int id = -1;
spin_lock_irq(&gcwq->lock);
while (ida_get_new(&gcwq->worker_ida, &id)) {
@@ -1250,8 +1317,12 @@ static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
worker->gcwq = gcwq;
worker->id = id;
- worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
- gcwq->cpu, id);
+ if (!on_unbound_cpu)
+ worker->task = kthread_create(worker_thread, worker,
+ "kworker/%u:%d", gcwq->cpu, id);
+ else
+ worker->task = kthread_create(worker_thread, worker,
+ "kworker/u:%d", id);
if (IS_ERR(worker->task))
goto fail;
@@ -1260,10 +1331,13 @@ static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
* online later on. Make sure every worker has
* PF_THREAD_BOUND set.
*/
- if (bind)
+ if (bind && !on_unbound_cpu)
kthread_bind(worker->task, gcwq->cpu);
- else
+ else {
worker->task->flags |= PF_THREAD_BOUND;
+ if (on_unbound_cpu)
+ worker->flags |= WORKER_UNBOUND;
+ }
return worker;
fail:
@@ -1358,12 +1432,17 @@ static bool send_mayday(struct work_struct *work)
{
struct cpu_workqueue_struct *cwq = get_work_cwq(work);
struct workqueue_struct *wq = cwq->wq;
+ unsigned int cpu;
if (!(wq->flags & WQ_RESCUER))
return false;
/* mayday mayday mayday */
- if (!cpumask_test_and_set_cpu(cwq->gcwq->cpu, wq->mayday_mask))
+ cpu = cwq->gcwq->cpu;
+ /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */
+ if (cpu == WORK_CPU_UNBOUND)
+ cpu = 0;
+ if (!cpumask_test_and_set_cpu(cpu, wq->mayday_mask))
wake_up_process(wq->rescuer->task);
return true;
}
@@ -1882,6 +1961,7 @@ static int rescuer_thread(void *__wq)
struct workqueue_struct *wq = __wq;
struct worker *rescuer = wq->rescuer;
struct list_head *scheduled = &rescuer->scheduled;
+ bool is_unbound = wq->flags & WQ_UNBOUND;
unsigned int cpu;
set_user_nice(current, RESCUER_NICE_LEVEL);
@@ -1891,8 +1971,13 @@ repeat:
if (kthread_should_stop())
return 0;
+ /*
+ * See whether any cpu is asking for help. Unbounded
+ * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND.
+ */
for_each_cpu(cpu, wq->mayday_mask) {
- struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu;
+ struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq);
struct global_cwq *gcwq = cwq->gcwq;
struct work_struct *work, *n;
@@ -2034,7 +2119,7 @@ static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
atomic_set(&wq->nr_cwqs_to_flush, 1);
}
- for_each_possible_cpu(cpu) {
+ for_each_cwq_cpu(cpu, wq) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
struct global_cwq *gcwq = cwq->gcwq;
@@ -2344,7 +2429,7 @@ static void wait_on_work(struct work_struct *work)
lock_map_acquire(&work->lockdep_map);
lock_map_release(&work->lockdep_map);
- for_each_possible_cpu(cpu)
+ for_each_gcwq_cpu(cpu)
wait_on_cpu_work(get_gcwq(cpu), work);
}
@@ -2590,23 +2675,25 @@ static int alloc_cwqs(struct workqueue_struct *wq)
const size_t size = sizeof(struct cpu_workqueue_struct);
const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
__alignof__(unsigned long long));
-#ifndef CONFIG_SMP
- void *ptr;
- /*
- * Allocate enough room to align cwq and put an extra pointer
- * at the end pointing back to the originally allocated
- * pointer which will be used for free.
- */
- ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL);
- if (ptr) {
- wq->cpu_wq.single = PTR_ALIGN(ptr, align);
- *(void **)(wq->cpu_wq.single + 1) = ptr;
+ if (CONFIG_SMP && !(wq->flags & WQ_UNBOUND)) {
+ /* on SMP, percpu allocator can align itself */
+ wq->cpu_wq.pcpu = __alloc_percpu(size, align);
+ } else {
+ void *ptr;
+
+ /*
+ * Allocate enough room to align cwq and put an extra
+ * pointer at the end pointing back to the originally
+ * allocated pointer which will be used for free.
+ */
+ ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL);
+ if (ptr) {
+ wq->cpu_wq.single = PTR_ALIGN(ptr, align);
+ *(void **)(wq->cpu_wq.single + 1) = ptr;
+ }
}
-#else
- /* On SMP, percpu allocator can align itself */
- wq->cpu_wq.pcpu = __alloc_percpu(size, align);
-#endif
+
/* just in case, make sure it's actually aligned */
BUG_ON(!IS_ALIGNED(wq->cpu_wq.v, align));
return wq->cpu_wq.v ? 0 : -ENOMEM;
@@ -2614,23 +2701,25 @@ static int alloc_cwqs(struct workqueue_struct *wq)
static void free_cwqs(struct workqueue_struct *wq)
{
-#ifndef CONFIG_SMP
- /* on UP, the pointer to free is stored right after the cwq */
- if (wq->cpu_wq.single)
+ if (CONFIG_SMP && !(wq->flags & WQ_UNBOUND))
+ free_percpu(wq->cpu_wq.pcpu);
+ else if (wq->cpu_wq.single) {
+ /* the pointer to free is stored right after the cwq */
kfree(*(void **)(wq->cpu_wq.single + 1));
-#else
- free_percpu(wq->cpu_wq.pcpu);
-#endif
+ }
}
-static int wq_clamp_max_active(int max_active, const char *name)
+static int wq_clamp_max_active(int max_active, unsigned int flags,
+ const char *name)
{
- if (max_active < 1 || max_active > WQ_MAX_ACTIVE)
+ int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
+
+ if (max_active < 1 || max_active > lim)
printk(KERN_WARNING "workqueue: max_active %d requested for %s "
"is out of range, clamping between %d and %d\n",
- max_active, name, 1, WQ_MAX_ACTIVE);
+ max_active, name, 1, lim);
- return clamp_val(max_active, 1, WQ_MAX_ACTIVE);
+ return clamp_val(max_active, 1, lim);
}
struct workqueue_struct *__alloc_workqueue_key(const char *name,
@@ -2642,8 +2731,15 @@ struct workqueue_struct *__alloc_workqueue_key(const char *name,
struct workqueue_struct *wq;
unsigned int cpu;
+ /*
+ * Unbound workqueues aren't concurrency managed and should be
+ * dispatched to workers immediately.
+ */
+ if (flags & WQ_UNBOUND)
+ flags |= WQ_HIGHPRI;
+
max_active = max_active ?: WQ_DFL_ACTIVE;
- max_active = wq_clamp_max_active(max_active, name);
+ max_active = wq_clamp_max_active(max_active, flags, name);
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
@@ -2664,7 +2760,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *name,
if (alloc_cwqs(wq) < 0)
goto err;
- for_each_possible_cpu(cpu) {
+ for_each_cwq_cpu(cpu, wq) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
struct global_cwq *gcwq = get_gcwq(cpu);
@@ -2703,7 +2799,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *name,
spin_lock(&workqueue_lock);
if (workqueue_freezing && wq->flags & WQ_FREEZEABLE)
- for_each_possible_cpu(cpu)
+ for_each_cwq_cpu(cpu, wq)
get_cwq(cpu, wq)->max_active = 0;
list_add(&wq->list, &workqueues);
@@ -2743,7 +2839,7 @@ void destroy_workqueue(struct workqueue_struct *wq)
spin_unlock(&workqueue_lock);
/* sanity check */
- for_each_possible_cpu(cpu) {
+ for_each_cwq_cpu(cpu, wq) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
int i;
@@ -2777,13 +2873,13 @@ void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
{
unsigned int cpu;
- max_active = wq_clamp_max_active(max_active, wq->name);
+ max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
spin_lock(&workqueue_lock);
wq->saved_max_active = max_active;
- for_each_possible_cpu(cpu) {
+ for_each_cwq_cpu(cpu, wq) {
struct global_cwq *gcwq = get_gcwq(cpu);
spin_lock_irq(&gcwq->lock);
@@ -3310,7 +3406,7 @@ void freeze_workqueues_begin(void)
BUG_ON(workqueue_freezing);
workqueue_freezing = true;
- for_each_possible_cpu(cpu) {
+ for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
struct workqueue_struct *wq;
@@ -3322,7 +3418,7 @@ void freeze_workqueues_begin(void)
list_for_each_entry(wq, &workqueues, list) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
- if (wq->flags & WQ_FREEZEABLE)
+ if (cwq && wq->flags & WQ_FREEZEABLE)
cwq->max_active = 0;
}
@@ -3354,7 +3450,7 @@ bool freeze_workqueues_busy(void)
BUG_ON(!workqueue_freezing);
- for_each_possible_cpu(cpu) {
+ for_each_gcwq_cpu(cpu) {
struct workqueue_struct *wq;
/*
* nr_active is monotonically decreasing. It's safe
@@ -3363,7 +3459,7 @@ bool freeze_workqueues_busy(void)
list_for_each_entry(wq, &workqueues, list) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
- if (!(wq->flags & WQ_FREEZEABLE))
+ if (!cwq || !(wq->flags & WQ_FREEZEABLE))
continue;
BUG_ON(cwq->nr_active < 0);
@@ -3396,7 +3492,7 @@ void thaw_workqueues(void)
if (!workqueue_freezing)
goto out_unlock;
- for_each_possible_cpu(cpu) {
+ for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
struct workqueue_struct *wq;
@@ -3408,7 +3504,7 @@ void thaw_workqueues(void)
list_for_each_entry(wq, &workqueues, list) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
- if (!(wq->flags & WQ_FREEZEABLE))
+ if (!cwq || !(wq->flags & WQ_FREEZEABLE))
continue;
/* restore max_active and repopulate worklist */
@@ -3451,12 +3547,14 @@ void __init init_workqueues(void)
hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
/* initialize gcwqs */
- for_each_possible_cpu(cpu) {
+ for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
spin_lock_init(&gcwq->lock);
INIT_LIST_HEAD(&gcwq->worklist);
gcwq->cpu = cpu;
+ if (cpu == WORK_CPU_UNBOUND)
+ gcwq->flags |= GCWQ_DISASSOCIATED;
INIT_LIST_HEAD(&gcwq->idle_list);
for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
@@ -3476,7 +3574,7 @@ void __init init_workqueues(void)
}
/* create the initial worker */
- for_each_online_cpu(cpu) {
+ for_each_online_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
struct worker *worker;
@@ -3490,5 +3588,7 @@ void __init init_workqueues(void)
system_wq = alloc_workqueue("events", 0, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
+ system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
+ WQ_UNBOUND_MAX_ACTIVE);
BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq);
}
--
1.6.4.2
These four patches implement unbound workqueues which can be used as
simple execution context provider. I changed async to use it and will
also make fscache use it. This can be used by setting WQ_UNBOUND on
workqueue creation. Works queued to unbound workqueues are implicitly
HIGHPRI and dispatched to unbound workers as soon as resources are
available and the only limitation applied by workqueue code is
@max_active. IOW, for both async and fscache, things will stay about
the same.
WQ_UNBOUND can serve the role of WQ_SINGLE_CPU. WQ_SINGLE_CPU is
dropped and replaced by WQ_UNBOUND.
Arjan, I still think we'll be better off using bound workqueues for
async but let's first convert without causing behavior difference.
Either way isn't gonna result in any noticeable difference anyway. If
you're okay with the conversion, please ack it.
David, this should work for fscache/slow-work the same way too. That
should relieve your concern, right? Oh, and Frederic suggested that
we would be better off with something based on tracing API and I
agree, so the debugfs thing is currently dropped from the tree. What
do you think?
Thanks.
--
tejun
Drop WQ_SINGLE_CPU support and use WQ_UNBOUND instead. Note that most
single thread workqueue users will be converted to use multithread or
non-reentrant instead and only the ones which require strict ordering
will keep using WQ_UNBOUND + @max_active of 1.
Signed-off-by: Tejun Heo <t...@kernel.org>
---
include/linux/workqueue.h | 7 +--
kernel/workqueue.c | 100 ++++++++-------------------------------------
2 files changed, 21 insertions(+), 86 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 67ce734..d74a529 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -233,12 +233,11 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
enum {
WQ_NON_REENTRANT = 1 << 0, /* guarantee non-reentrance */
- WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
+ WQ_UNBOUND = 1 << 1, /* not bound to any cpu */
WQ_FREEZEABLE = 1 << 2, /* freeze during suspend */
WQ_RESCUER = 1 << 3, /* has an rescue worker */
WQ_HIGHPRI = 1 << 4, /* high priority */
WQ_CPU_INTENSIVE = 1 << 5, /* cpu instensive workqueue */
- WQ_UNBOUND = 1 << 6, /* not bound to any cpu */
WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */
WQ_MAX_UNBOUND_PER_CPU = 4, /* 4 * #cpus for unbound wq */
@@ -300,9 +299,9 @@ __alloc_workqueue_key(const char *name, unsigned int flags, int max_active,
#define create_workqueue(name) \
alloc_workqueue((name), WQ_RESCUER, 1)
#define create_freezeable_workqueue(name) \
- alloc_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_CPU | WQ_RESCUER, 1)
+ alloc_workqueue((name), WQ_FREEZEABLE | WQ_UNBOUND | WQ_RESCUER, 1)
#define create_singlethread_workqueue(name) \
- alloc_workqueue((name), WQ_SINGLE_CPU | WQ_RESCUER, 1)
+ alloc_workqueue((name), WQ_UNBOUND | WQ_RESCUER, 1)
extern void destroy_workqueue(struct workqueue_struct *wq);
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 4608563..20d6237 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -206,8 +206,6 @@ struct workqueue_struct {
struct list_head flusher_queue; /* F: flush waiters */
struct list_head flusher_overflow; /* F: flush overflow list */
- unsigned long single_cpu; /* cpu for single cpu wq */
-
cpumask_var_t mayday_mask; /* cpus requesting rescue */
struct worker *rescuer; /* I: rescue worker */
@@ -889,34 +887,6 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
wake_up_worker(gcwq);
}
-/**
- * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
- * @cwq: cwq to unbind
- *
- * Try to unbind @cwq from single cpu workqueue processing. If
- * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
- *
- * CONTEXT:
- * spin_lock_irq(gcwq->lock).
- */
-static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
-{
- struct workqueue_struct *wq = cwq->wq;
- struct global_cwq *gcwq = cwq->gcwq;
-
- BUG_ON(wq->single_cpu != gcwq->cpu);
- /*
- * Unbind from workqueue if @cwq is not frozen. If frozen,
- * thaw_workqueues() will either restart processing on this
- * cpu or unbind if empty. This keeps works queued while
- * frozen fully ordered and flushable.
- */
- if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
- smp_wmb(); /* paired with cmpxchg() in __queue_work() */
- wq->single_cpu = WORK_CPU_NONE;
- }
-}
-
static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
@@ -924,20 +894,16 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct cpu_workqueue_struct *cwq;
struct list_head *worklist;
unsigned long flags;
- bool arbitrate;
debug_work_activate(work);
- if (unlikely(cpu == WORK_CPU_UNBOUND))
- cpu = raw_smp_processor_id();
-
- /*
- * Determine gcwq to use. SINGLE_CPU is inherently
- * NON_REENTRANT, so test it first.
- */
- if (!(wq->flags & (WQ_SINGLE_CPU | WQ_UNBOUND))) {
+ /* determine gcwq to use */
+ if (!(wq->flags & WQ_UNBOUND)) {
struct global_cwq *last_gcwq;
+ if (unlikely(cpu == WORK_CPU_UNBOUND))
+ cpu = raw_smp_processor_id();
+
/*
* It's multi cpu. If @wq is non-reentrant and @work
* was previously on a different cpu, it might still
@@ -962,38 +928,6 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
}
} else
spin_lock_irqsave(&gcwq->lock, flags);
- } else if (!(wq->flags & WQ_UNBOUND)) {
- unsigned int req_cpu = cpu;
-
- /*
- * It's a bit more complex for single cpu workqueues.
- * We first need to determine which cpu is going to be
- * used. If no cpu is currently serving this
- * workqueue, arbitrate using atomic accesses to
- * wq->single_cpu; otherwise, use the current one.
- */
- retry:
- cpu = wq->single_cpu;
- arbitrate = cpu == WORK_CPU_NONE;
- if (arbitrate)
- cpu = req_cpu;
-
- gcwq = get_gcwq(cpu);
- spin_lock_irqsave(&gcwq->lock, flags);
-
- /*
- * The following cmpxchg() is a full barrier paired
- * with smp_wmb() in cwq_unbind_single_cpu() and
- * guarantees that all changes to wq->st_* fields are
- * visible on the new cpu after this point.
- */
- if (arbitrate)
- cmpxchg(&wq->single_cpu, WORK_CPU_NONE, cpu);
-
- if (unlikely(wq->single_cpu != cpu)) {
- spin_unlock_irqrestore(&gcwq->lock, flags);
- goto retry;
- }
} else {
gcwq = get_gcwq(WORK_CPU_UNBOUND);
spin_lock_irqsave(&gcwq->lock, flags);
@@ -1105,19 +1039,30 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work = &dwork->work;
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
- struct global_cwq *gcwq = get_work_gcwq(work);
- unsigned int lcpu = gcwq ? gcwq->cpu : raw_smp_processor_id();
+ unsigned int lcpu;
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
timer_stats_timer_set_start_info(&dwork->timer);
+
/*
* This stores cwq for the moment, for the timer_fn.
* Note that the work's gcwq is preserved to allow
* reentrance detection for delayed works.
*/
+ if (!(wq->flags & WQ_UNBOUND)) {
+ struct global_cwq *gcwq = get_work_gcwq(work);
+
+ if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND)
+ lcpu = gcwq->cpu;
+ else
+ lcpu = raw_smp_processor_id();
+ } else
+ lcpu = WORK_CPU_UNBOUND;
+
set_work_cwq(work, get_cwq(lcpu, wq), 0);
+
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
@@ -1696,9 +1641,6 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
/* one down, submit a delayed one */
if (cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq);
- } else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
- /* this was the last work, unbind from single cpu */
- cwq_unbind_single_cpu(cwq);
}
/* is flush in progress and are we at the flushing tip? */
@@ -2751,7 +2693,6 @@ struct workqueue_struct *__alloc_workqueue_key(const char *name,
atomic_set(&wq->nr_cwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
- wq->single_cpu = WORK_CPU_NONE;
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
@@ -3513,11 +3454,6 @@ void thaw_workqueues(void)
while (!list_empty(&cwq->delayed_works) &&
cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq);
-
- /* perform delayed unbind from single cpu if empty */
- if (wq->single_cpu == gcwq->cpu &&
- !cwq->nr_active && list_empty(&cwq->delayed_works))
- cwq_unbind_single_cpu(cwq);
}
wake_up_worker(gcwq);
--
1.6.4.2
Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Arjan van de Ven <ar...@infradead.org>
---
kernel/async.c | 140 ++++++++-----------------------------------------------
1 files changed, 21 insertions(+), 119 deletions(-)
diff --git a/kernel/async.c b/kernel/async.c
index 15319d6..52acfc0 100644
static DECLARE_WAIT_QUEUE_HEAD(async_done);
-static DECLARE_WAIT_QUEUE_HEAD(async_new);
extern int initcall_debug;
spin_unlock_irqrestore(&async_lock, flags);
+ queue_work(system_unbound_wq, &entry->work);
+
return newcookie;
}
--
* It uses a dedicated gcwq with a pseudo CPU number WORK_CPU_UNBOUND.
This gcwq is always online and disassociated.
* Workers are not bound to any CPU and not concurrency managed. Works
are dispatched to workers as soon as possible and the only applied
limitation is @max_active. IOW, all unbound workqeueues are
implicitly high priority.
Unbound workqueues can be used as simple execution context provider.
Contexts unbound to any cpu are served as soon as possible.
Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Arjan van de Ven <ar...@linux.intel.com>
Cc: David Howells <dhow...@redhat.com>
---
include/linux/workqueue.h | 15 +++-
kernel/workqueue.c | 218 +++++++++++++++++++++++++++++++++------------
2 files changed, 173 insertions(+), 60 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 139069a..67ce734 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -51,7 +51,8 @@ enum {
WORK_NO_COLOR = WORK_NR_COLORS,
/* special cpu IDs */
- WORK_CPU_NONE = NR_CPUS,
+ WORK_CPU_UNBOUND = NR_CPUS,
+ WORK_CPU_NONE = NR_CPUS + 1,
WORK_CPU_LAST = WORK_CPU_NONE,
/*
@@ -237,11 +238,17 @@ enum {
WQ_RESCUER = 1 << 3, /* has an rescue worker */
WQ_HIGHPRI = 1 << 4, /* high priority */
WQ_CPU_INTENSIVE = 1 << 5, /* cpu instensive workqueue */
+ WQ_UNBOUND = 1 << 6, /* not bound to any cpu */
WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */
+ WQ_MAX_UNBOUND_PER_CPU = 4, /* 4 * #cpus for unbound wq */
WQ_DFL_ACTIVE = WQ_MAX_ACTIVE / 2,
};
+/* unbound wq's aren't per-cpu, scale max_active according to #cpus */
+#define WQ_UNBOUND_MAX_ACTIVE \
+ max_t(int, WQ_MAX_ACTIVE, num_possible_cpus() * WQ_MAX_UNBOUND_PER_CPU)
+
/*
* System-wide workqueues which are always present.
*
@@ -256,10 +263,16 @@ enum {
* system_nrt_wq is non-reentrant and guarantees that any given work
* item is never executed in parallel by multiple CPUs. Queue
* flushing might take relatively long.
+ *
+ * system_unbound_wq is unbound workqueue. Workers are not bound to
+ * any specific CPU, not concurrency managed, and all queued works are
+ * executed immediately as long as max_active limit is not reached and
+ * resources are available.
*/
extern struct workqueue_struct *system_wq;
extern struct workqueue_struct *system_long_wq;
extern struct workqueue_struct *system_nrt_wq;
+extern struct workqueue_struct *system_unbound_wq;
extern struct workqueue_struct *
__alloc_workqueue_key(const char *name, unsigned int flags, int max_active,
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index a105ddf..4608563 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
+ if (!(wq->flags & WQ_UNBOUND)) {
+ if (likely(cpu < nr_cpu_ids)) {
+#ifdef CONFIG_SMP
+ return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
#else
- return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
+ return wq->cpu_wq.single;
#endif
+ }
+ } else if (likely(cpu == WORK_CPU_UNBOUND))
+ return wq->cpu_wq.single;
+ return NULL;
}
static unsigned int work_color_to_flags(int color)
@@ -453,7 +512,7 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
if (cpu == WORK_CPU_NONE)
return NULL;
- BUG_ON(cpu >= nr_cpu_ids);
+ BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND);
return get_gcwq(cpu);
}
@@ -869,11 +928,14 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
debug_work_activate(work);
+ if (unlikely(cpu == WORK_CPU_UNBOUND))
+ cpu = raw_smp_processor_id();
+
/*
* Determine gcwq to use. SINGLE_CPU is inherently
* NON_REENTRANT, so test it first.
*/
- if (!(wq->flags & WQ_SINGLE_CPU)) {
+ if (!(wq->flags & (WQ_SINGLE_CPU | WQ_UNBOUND))) {
struct global_cwq *last_gcwq;
/*
@@ -900,7 +962,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
}
} else
spin_lock_irqsave(&gcwq->lock, flags);
- } else {
+ } else if (!(wq->flags & WQ_UNBOUND)) {
unsigned int req_cpu = cpu;
/*
struct workqueue_struct *wq = cwq->wq;
+ unsigned int cpu;
struct global_cwq *gcwq = cwq->gcwq;
struct work_struct *work, *n;
@@ -2034,7 +2119,7 @@ static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
atomic_set(&wq->nr_cwqs_to_flush, 1);
}
- for_each_possible_cpu(cpu) {
+ for_each_cwq_cpu(cpu, wq) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
struct global_cwq *gcwq = cwq->gcwq;
@@ -2344,7 +2429,7 @@ static void wait_on_work(struct work_struct *work)
lock_map_acquire(&work->lockdep_map);
lock_map_release(&work->lockdep_map);
struct workqueue_struct *__alloc_workqueue_key(const char *name,
spin_lock(&workqueue_lock);
wq->saved_max_active = max_active;
BUG_ON(!workqueue_freezing);
--
* Add WORK_CPU_* constants for pseudo cpu id numbers used (currently
only WORK_CPU_NONE) and use them instead of NR_CPUS. This is to
allow another pseudo cpu id for unbound cpu.
* Reorder WQ_* flags.
* Make workqueue_struct->cpu_wq a union which contains a percpu
pointer, regular pointer and an unsigned long value and use
kzalloc/kfree() in UP allocation path. This will be used to
implement unbound workqueues which will use only one cwq on SMPs.
* Move alloc_cwqs() allocation after initialization of wq fields, so
that alloc_cwqs() has access to wq->flags.
* Trivial relocation of wq local variables in freeze functions.
These changes don't cause any functional change.
Signed-off-by: Tejun Heo <t...@kernel.org>
---
include/linux/workqueue.h | 10 ++++--
kernel/workqueue.c | 83 +++++++++++++++++++++++---------------------
2 files changed, 50 insertions(+), 43 deletions(-)
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 3f36d37..139069a 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -50,6 +50,10 @@ enum {
WORK_NR_COLORS = (1 << WORK_STRUCT_COLOR_BITS) - 1,
WORK_NO_COLOR = WORK_NR_COLORS,
+ /* special cpu IDs */
+ WORK_CPU_NONE = NR_CPUS,
+ WORK_CPU_LAST = WORK_CPU_NONE,
+
/*
* Reserve 6 bits off of cwq pointer w/ debugobjects turned
* off. This makes cwqs aligned to 64 bytes which isn't too
@@ -60,7 +64,7 @@ enum {
WORK_STRUCT_FLAG_MASK = (1UL << WORK_STRUCT_FLAG_BITS) - 1,
WORK_STRUCT_WQ_DATA_MASK = ~WORK_STRUCT_FLAG_MASK,
- WORK_STRUCT_NO_CPU = NR_CPUS << WORK_STRUCT_FLAG_BITS,
+ WORK_STRUCT_NO_CPU = WORK_CPU_NONE << WORK_STRUCT_FLAG_BITS,
/* bit mask for work_busy() return values */
WORK_BUSY_PENDING = 1 << 0,
@@ -227,9 +231,9 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
clear_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))
enum {
- WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */
+ WQ_NON_REENTRANT = 1 << 0, /* guarantee non-reentrance */
WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
- WQ_NON_REENTRANT = 1 << 2, /* guarantee non-reentrance */
+ WQ_FREEZEABLE = 1 << 2, /* freeze during suspend */
WQ_RESCUER = 1 << 3, /* has an rescue worker */
WQ_HIGHPRI = 1 << 4, /* high priority */
WQ_CPU_INTENSIVE = 1 << 5, /* cpu instensive workqueue */
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 2eb9fbd..a105ddf 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -190,7 +190,11 @@ struct wq_flusher {
*/
struct workqueue_struct {
unsigned int flags; /* I: WQ_* flags */
- struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */
+ union {
+ struct cpu_workqueue_struct __percpu *pcpu;
+ struct cpu_workqueue_struct *single;
+ unsigned long v;
+ } cpu_wq; /* I: cwq's */
struct list_head list; /* W: list of all workqueues */
struct mutex flush_mutex; /* protects wq flushing */
@@ -362,7 +366,11 @@ static atomic_t *get_gcwq_nr_running(unsigned int cpu)
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
struct workqueue_struct *wq)
{
- return per_cpu_ptr(wq->cpu_wq, cpu);
+#ifndef CONFIG_SMP
+ return wq->cpu_wq.single;
+#else
+ return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
+#endif
}
static unsigned int work_color_to_flags(int color)
@@ -442,7 +450,7 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
return ((struct cpu_workqueue_struct *)data)->gcwq;
cpu = data >> WORK_STRUCT_FLAG_BITS;
- if (cpu == NR_CPUS)
+ if (cpu == WORK_CPU_NONE)
return NULL;
BUG_ON(cpu >= nr_cpu_ids);
@@ -846,7 +854,7 @@ static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
*/
if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
smp_wmb(); /* paired with cmpxchg() in __queue_work() */
- wq->single_cpu = NR_CPUS;
+ wq->single_cpu = WORK_CPU_NONE;
}
}
@@ -904,7 +912,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
*/
retry:
cpu = wq->single_cpu;
- arbitrate = cpu == NR_CPUS;
+ arbitrate = cpu == WORK_CPU_NONE;
if (arbitrate)
cpu = req_cpu;
@@ -918,7 +926,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
* visible on the new cpu after this point.
*/
if (arbitrate)
- cmpxchg(&wq->single_cpu, NR_CPUS, cpu);
+ cmpxchg(&wq->single_cpu, WORK_CPU_NONE, cpu);
if (unlikely(wq->single_cpu != cpu)) {
spin_unlock_irqrestore(&gcwq->lock, flags);
@@ -2572,7 +2580,7 @@ int keventd_up(void)
return system_wq != NULL;
}
-static struct cpu_workqueue_struct *alloc_cwqs(void)
+static int alloc_cwqs(struct workqueue_struct *wq)
{
/*
* cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
@@ -2582,40 +2590,36 @@ static struct cpu_workqueue_struct *alloc_cwqs(void)
const size_t size = sizeof(struct cpu_workqueue_struct);
const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
__alignof__(unsigned long long));
- struct cpu_workqueue_struct *cwqs;
#ifndef CONFIG_SMP
void *ptr;
/*
- * On UP, percpu allocator doesn't honor alignment parameter
- * and simply uses arch-dependent default. Allocate enough
- * room to align cwq and put an extra pointer at the end
- * pointing back to the originally allocated pointer which
- * will be used for free.
- *
- * FIXME: This really belongs to UP percpu code. Update UP
- * percpu code to honor alignment and remove this ugliness.
+ * Allocate enough room to align cwq and put an extra pointer
+ * at the end pointing back to the originally allocated
+ * pointer which will be used for free.
*/
- ptr = __alloc_percpu(size + align + sizeof(void *), 1);
- cwqs = PTR_ALIGN(ptr, align);
- *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
+ ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL);
+ if (ptr) {
+ wq->cpu_wq.single = PTR_ALIGN(ptr, align);
+ *(void **)(wq->cpu_wq.single + 1) = ptr;
+ }
#else
- /* On SMP, percpu allocator can do it itself */
- cwqs = __alloc_percpu(size, align);
+ /* On SMP, percpu allocator can align itself */
+ wq->cpu_wq.pcpu = __alloc_percpu(size, align);
#endif
/* just in case, make sure it's actually aligned */
- BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
- return cwqs;
+ BUG_ON(!IS_ALIGNED(wq->cpu_wq.v, align));
+ return wq->cpu_wq.v ? 0 : -ENOMEM;
}
-static void free_cwqs(struct cpu_workqueue_struct *cwqs)
+static void free_cwqs(struct workqueue_struct *wq)
{
#ifndef CONFIG_SMP
/* on UP, the pointer to free is stored right after the cwq */
- if (cwqs)
- free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
+ if (wq->cpu_wq.single)
+ kfree(*(void **)(wq->cpu_wq.single + 1));
#else
- free_percpu(cwqs);
+ free_percpu(wq->cpu_wq.pcpu);
#endif
}
@@ -2645,22 +2649,21 @@ struct workqueue_struct *__alloc_workqueue_key(const char *name,
if (!wq)
goto err;
- wq->cpu_wq = alloc_cwqs();
- if (!wq->cpu_wq)
- goto err;
-
wq->flags = flags;
wq->saved_max_active = max_active;
mutex_init(&wq->flush_mutex);
atomic_set(&wq->nr_cwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
- wq->single_cpu = NR_CPUS;
+ wq->single_cpu = WORK_CPU_NONE;
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
+ if (alloc_cwqs(wq) < 0)
+ goto err;
+
for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
struct global_cwq *gcwq = get_gcwq(cpu);
@@ -2710,7 +2713,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *name,
return wq;
err:
if (wq) {
- free_cwqs(wq->cpu_wq);
+ free_cwqs(wq);
free_cpumask_var(wq->mayday_mask);
kfree(wq->rescuer);
kfree(wq);
@@ -2755,7 +2758,7 @@ void destroy_workqueue(struct workqueue_struct *wq)
free_cpumask_var(wq->mayday_mask);
}
- free_cwqs(wq->cpu_wq);
+ free_cwqs(wq);
kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);
@@ -2821,13 +2824,13 @@ EXPORT_SYMBOL_GPL(workqueue_congested);
* @work: the work of interest
*
* RETURNS:
- * CPU number if @work was ever queued. NR_CPUS otherwise.
+ * CPU number if @work was ever queued. WORK_CPU_NONE otherwise.
*/
unsigned int work_cpu(struct work_struct *work)
{
struct global_cwq *gcwq = get_work_gcwq(work);
- return gcwq ? gcwq->cpu : NR_CPUS;
+ return gcwq ? gcwq->cpu : WORK_CPU_NONE;
}
EXPORT_SYMBOL_GPL(work_cpu);
@@ -3300,7 +3303,6 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
*/
void freeze_workqueues_begin(void)
{
- struct workqueue_struct *wq;
unsigned int cpu;
spin_lock(&workqueue_lock);
@@ -3310,6 +3312,7 @@ void freeze_workqueues_begin(void)
for_each_possible_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
+ struct workqueue_struct *wq;
spin_lock_irq(&gcwq->lock);
@@ -3344,7 +3347,6 @@ void freeze_workqueues_begin(void)
*/
bool freeze_workqueues_busy(void)
{
- struct workqueue_struct *wq;
unsigned int cpu;
bool busy = false;
@@ -3353,6 +3355,7 @@ bool freeze_workqueues_busy(void)
BUG_ON(!workqueue_freezing);
for_each_possible_cpu(cpu) {
+ struct workqueue_struct *wq;
/*
* nr_active is monotonically decreasing. It's safe
* to peek without lock.
@@ -3386,7 +3389,6 @@ out_unlock:
*/
void thaw_workqueues(void)
{
- struct workqueue_struct *wq;
unsigned int cpu;
spin_lock(&workqueue_lock);
@@ -3396,6 +3398,7 @@ void thaw_workqueues(void)
for_each_possible_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
+ struct workqueue_struct *wq;
spin_lock_irq(&gcwq->lock);
@@ -3443,7 +3446,7 @@ void __init init_workqueues(void)
* sure cpu number won't overflow into kernel pointer area so
* that they can be distinguished.
*/
- BUILD_BUG_ON(NR_CPUS << WORK_STRUCT_FLAG_BITS >= PAGE_OFFSET);
+ BUILD_BUG_ON(WORK_CPU_LAST << WORK_STRUCT_FLAG_BITS >= PAGE_OFFSET);
hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
--
1.6.4.2
Oops, forgot something. These four patches are on top of
wq#for-next-candidate branch which is cmwq take#6 + four fix patches
git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git for-next-candidate
and available in the following git tree.
git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git review-cmwq
system_unbound_wq actually.
> --- a/kernel/async.c
> +++ b/kernel/async.c
> @@ -49,40 +49,32 @@ asynchronous and synchronous parts of the kernel.
> */
>
> #include <linux/async.h>
> -#include <linux/bug.h>
> #include <linux/module.h>
> #include <linux/wait.h>
> #include <linux/sched.h>
> -#include <linux/init.h>
> -#include <linux/kthread.h>
> -#include <linux/delay.h>
> #include <linux/slab.h>
> #include <asm/atomic.h>
>
Shouldn't it include linux/workqueue.h now?
--
Stefan Richter
-=====-==-=- -=== ---=-
http://arcgraph.de/sr/
Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Arjan van de Ven <ar...@infradead.org>
---
workqueue.h include added and patch description updated. git branch
updated accordingly.
Thanks.
kernel/async.c | 141 ++++++++-------------------------------------------------
1 file changed, 22 insertions(+), 119 deletions(-)
Index: work/kernel/async.c
===================================================================
--- work.orig/kernel/async.c
+++ work/kernel/async.c
@@ -49,40 +49,33 @@ asynchronous and synchronous parts of th
*/
#include <linux/async.h>
-#include <linux/bug.h>
#include <linux/module.h>
#include <linux/wait.h>
#include <linux/sched.h>
-#include <linux/init.h>
-#include <linux/kthread.h>
-#include <linux/delay.h>
#include <linux/slab.h>
+#include <linux/workqueue.h>
#include <asm/atomic.h>
static DECLARE_WAIT_QUEUE_HEAD(async_done);
-static DECLARE_WAIT_QUEUE_HEAD(async_new);
extern int initcall_debug;
@@ -117,27 +110,23 @@ static async_cookie_t lowest_in_progres
@@ -153,31 +142,25 @@ static void run_one_entry(void)
(long long)ktime_to_ns(delta) >> 10);
}
spin_unlock_irqrestore(&async_lock, flags);
@@ -186,7 +169,7 @@ static async_cookie_t __async_schedule(a
* If we're out of memory or if there's too much work
* pending already, we execute synchronously.
*/
- if (!async_enabled || !entry || atomic_read(&entry_count) > MAX_WORK) {
+ if (!entry || atomic_read(&entry_count) > MAX_WORK) {
kfree(entry);
spin_lock_irqsave(&async_lock, flags);
newcookie = next_cookie++;
@@ -196,6 +179,7 @@ static async_cookie_t __async_schedule(a
ptr(data, newcookie);
return newcookie;
}
+ INIT_WORK(&entry->work, async_run_entry_fn);
entry->func = ptr;
entry->data = data;
entry->running = running;
@@ -205,7 +189,10 @@ static async_cookie_t __async_schedule(a
list_add_tail(&entry->list, &async_pending);
atomic_inc(&entry_count);
spin_unlock_irqrestore(&async_lock, flags);
- wake_up(&async_new);
+
+ /* schedule for execution */
+ queue_work(system_unbound_wq, &entry->work);
+
return newcookie;
}
@@ -312,87 +299,3 @@ void async_synchronize_cookie(async_cook
Updated. Thanks a lot.
--
tejun
Ping, Arjan.
On 07/07/2010 07:41 AM, Tejun Heo wrote:
> On 07/02/2010 11:17 AM, Tejun Heo wrote:
>> Arjan, I still think we'll be better off using bound workqueues for
>> async but let's first convert without causing behavior difference.
>> Either way isn't gonna result in any noticeable difference anyway. If
>> you're okay with the conversion, please ack it.
>
> Ping, Arjan.
Just for the record, I pinged Arjan again offlist and Arjan acked the
conversion in the reply. Added Acked-by and pushed the conversion to
for-next-candidate which will be pushed into linux-next the next week.
> David, this should work for fscache/slow-work the same way too. That
> should relieve your concern, right?
Not at the moment. What does this mean:
* Unbound workqueues aren't concurrency managed and should be
* dispatched to workers immediately.
Does this mean you don't get reentrancy guarantees with unbounded work queues?
I can't work out how you're achieving it with unbounded queues. I presume with
CPU-bound workqueues your doing it by binding the work item to the current CPU
still...
Btw, how does this fare in an RT system, where work items bound to a CPU can't
get executed because their CPU is busy with an RT thread, even though there are
other, idle CPUs?
> Oh, and Frederic suggested that we would be better off with something based
> on tracing API and I agree, so the debugfs thing is currently dropped from
> the tree. What do you think?
I probably disagree. I just want to be able to cat a file and see the current
runqueue state. I don't want to have to write and distribute a special program
to do this. Of course, I don't know that much about the tracing API, so
cat'ing a file to get the runqueue listed nicely may be possible with that.
David
"David Howells" <dhow...@redhat.com> wrote:
> Does this mean you don't get reentrancy guarantees with unbounded work queues?
It means that unbound wq behaves like a generic worker pool. Bound wq limits concurrency to minimal level but unbound one executes works as long as resources are available. I'll continue below.
>I can't work out how you're achieving it with unbounded queues. I presume with
>CPU-bound workqueues your doing it by binding the work item to the current CPU
>still...
Unbound works are served by a dedicated gcwq whose workers are not affine to any particular CPU. As all unbound works are served by the same gcwq, non reentrancy is automatically guaranteed.
>Btw, how does this fare in an RT system, where work items bound to a CPU can't
>get executed because their CPU is busy with an RT thread, even though there are
>other, idle CPUs?
Sure, there's nothing special about unbound workers. They're just normal kthreads.
>> Oh, and Frederic suggested that we would be better off with something based
>> on tracing API and I agree, so the debugfs thing is currently dropped from
>> the tree. What do you think?
>
>I probably disagree. I just want to be able to cat a file and see the current
>runqueue state. I don't want to have to write and distribute a special program
>to do this. Of course, I don't know that much about the tracing API, so
>cat'ing a file to get the runqueue listed nicely may be possible with that.
I'm relatively sure we can do that. Frederic?
Thanks.
--
tejun
> As all unbound works are served by the same gcwq, non reentrancy is
> automatically guaranteed.
That doesn't actually explain _how_ it's non-reentrant. The gcwq includes a
collection of threads that can execute from it, right? If so, what mechanism
prevents two threads from executing the same work item, if that work item
isn't bound to a CPU? I've been trying to figure this out from the code, but
I don't see it offhand.
> > Btw, how does this fare in an RT system, where work items bound to a CPU
> > can't get executed because their CPU is busy with an RT thread, even
> > though there are other, idle CPUs?
>
> Sure, there's nothing special about unbound workers. They're just normal
> kthreads.
I should've been clearer: As I understand it, normal (unbound) worker items
are bound to the CPU on which they were queued, and will be executed there
only (barring CPU removal). If that's the case, isn't it possible that work
items can be prevented from getting execution time by an RT thread that's
hogging a CPU and won't let go?
David
On 07/21/2010 04:59 PM, Tejun Heo wrote:
>> I should've been clearer: As I understand it, normal (unbound) worker items
In workqueue land, normal workqueues would be bound to CPUs while
workers for WQ_UNBOUND workqueues aren't affined to any specific CPU.
Thanks.
--
tejun
On 07/21/2010 03:08 PM, David Howells wrote:
> Tejun Heo <t...@kernel.org> wrote:
>
>> As all unbound works are served by the same gcwq, non reentrancy is
>> automatically guaranteed.
>
> That doesn't actually explain _how_ it's non-reentrant. The gcwq includes a
> collection of threads that can execute from it, right? If so, what mechanism
> prevents two threads from executing the same work item, if that work item
> isn't bound to a CPU? I've been trying to figure this out from the code, but
> I don't see it offhand.
Sharing the same gcwq is why workqueues bound to one CPU have
non-reentrancy, so they're using the same mechanism. If it doesn't
work for unbound workqueues, the normal ones are broken too. Each
gcwq keeps track of currently running works in a hash table and looks
whether the work in question is already executing before starting
executing it. It's a bit complex but as a work_struct may be freed
once execution starts, the status needs to be tracked outside.
>>> Btw, how does this fare in an RT system, where work items bound to a CPU
>>> can't get executed because their CPU is busy with an RT thread, even
>>> though there are other, idle CPUs?
>>
>> Sure, there's nothing special about unbound workers. They're just normal
>> kthreads.
>
> I should've been clearer: As I understand it, normal (unbound) worker items
> are bound to the CPU on which they were queued, and will be executed there
> only (barring CPU removal). If that's the case, isn't it possible that work
> items can be prevented from getting execution time by an RT thread that's
> hogging a CPU and won't let go?
Yeah, for bound workqueues, sure. That's exactly the same as the
original workqueue implementation. For unbound workqueues, it doesn't
matter.
Thanks.
--
tejun
> Each gcwq keeps track of currently running works in a hash table and looks
> whether the work in question is already executing before starting executing
> it. It's a bit complex but as a work_struct may be freed once execution
> starts, the status needs to be tracked outside.
Thanks, that's what I wanted to know.
I presume this survives an executing work_struct being freed, reallocated and
requeued before the address of the work_struct is removed from the hash table?
I can see at least one way of doing this: marking the work_struct address in
the hash when the address becomes pending again so that the process of hash
removal will cause the work_struct to be requeued automatically.
David
On 07/21/2010 05:25 PM, David Howells wrote:
>> Each gcwq keeps track of currently running works in a hash table and looks
>> whether the work in question is already executing before starting executing
>> it. It's a bit complex but as a work_struct may be freed once execution
>> starts, the status needs to be tracked outside.
>
> Thanks, that's what I wanted to know.
>
> I presume this survives an executing work_struct being freed, reallocated and
> requeued before the address of the work_struct is removed from the hash table?
It will unnecessarily stall the execution of the new work if the last
work is still running but nothing will be broken correctness-wise.
> I can see at least one way of doing this: marking the work_struct address in
> the hash when the address becomes pending again so that the process of hash
> removal will cause the work_struct to be requeued automatically.
If I'm correctly understanding what you're saying, the code already
does about the same thing.
Thanks.
--
tejun
> If I'm correctly understanding what you're saying, the code already
> does about the same thing.
Cool.
Btw, it seems to work for fscache. Feel free to add my Acked-by to your
patches.
David
> It will unnecessarily stall the execution of the new work if the last
> work is still running but nothing will be broken correctness-wise.
That's fine. Better that than risk unexpected reentrance. You could add a
function to allow an executing work item to yield the hash entry to indicate
that the work_item that invoked it has been destroyed, but it's probably not
worth it, and it has scope for mucking things up horribly if used at the wrong
time.
I presume also that if a work_item being executed on one work queue is queued
on another work queue, then there is no non-reentrancy guarantee (which is
fine; if you don't like that, don't do it).
David
On 07/21/2010 05:38 PM, David Howells wrote:
> Tejun Heo <t...@kernel.org> wrote:
>
>> If I'm correctly understanding what you're saying, the code already
>> does about the same thing.
>
> Cool.
>
> Btw, it seems to work for fscache. Feel free to add my Acked-by to your
> patches.
Great, I'll start working on the debugging stuff once things settle
down a bit. Thank you.
--
tejun
On 07/21/2010 05:45 PM, David Howells wrote:
> That's fine. Better that than risk unexpected reentrance. You could add a
> function to allow an executing work item to yield the hash entry to indicate
> that the work_item that invoked it has been destroyed, but it's probably not
> worth it, and it has scope for mucking things up horribly if used at the wrong
> time.
Yeah, I agree, it's going too far and can be easily misused. Given
that there are very few users which actually do that, I think it would
be best to leave it alone.
> I presume also that if a work_item being executed on one work queue is queued
> on another work queue, then there is no non-reentrancy guarantee (which is
> fine; if you don't like that, don't do it).
Right, there is no non-reentrancy guarantee.
Thanks.
--
tejun