Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

[PATCH 30/40] workqueue: implement work_busy()

32 views
Skip to first unread message

Tejun Heo

unread,
Jan 17, 2010, 8:00:02 PM1/17/10
to
Implement work_busy() which tests whether a work is either pending or
running. The test isn't synchronized against workqueue operation and
the result is only good as advisory hints or for debugging.

Signed-off-by: Tejun Heo <t...@kernel.org>
---
include/linux/workqueue.h | 2 +-
kernel/workqueue.c | 31 +++++++++++++++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 265207d..4f8705f 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -293,8 +293,8 @@ extern int keventd_up(void);
extern void init_workqueues(void);
int execute_in_process_context(work_func_t fn, struct execute_work *);

+extern bool work_busy(struct work_struct *work);
extern int flush_work(struct work_struct *work);
-
extern int cancel_work_sync(struct work_struct *work);

/*
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 233278c..882d4d8 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -1960,6 +1960,37 @@ out_unlock:
EXPORT_SYMBOL_GPL(flush_workqueue);

/**
+ * work_busy - test whether a work is currently pending or running
+ * @work: the work to be tested
+ *
+ * Test whether @work is currently pending or running. There is no
+ * synchronization around this function and the test result is
+ * unreliable and only useful as advisory hints or for debugging. The
+ * caller is responsible for ensuring the workqueue @work was last
+ * queued on stays valid until this function returns.
+ *
+ * RETURNS:
+ * %true if @work is currently running, %false otherwise.
+ */
+bool work_busy(struct work_struct *work)
+{
+ struct cpu_workqueue_struct *cwq = get_wq_data(work);
+ struct global_cwq *gcwq;
+ unsigned long flags;
+ bool ret;
+
+ if (!cwq)
+ return false;
+ gcwq = cwq->gcwq;
+
+ spin_lock_irqsave(&gcwq->lock, flags);
+ ret = work_pending(work) || find_worker_executing_work(gcwq, work);
+ spin_unlock_irqrestore(&gcwq->lock, flags);
+ return ret;
+}
+EXPORT_SYMBOL_GPL(work_busy);
+
+/**
* flush_work - block until a work_struct's callback has terminated
* @work: the work which is to be flushed
*
--
1.6.4.2

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majo...@vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/

Tejun Heo

unread,
Jan 17, 2010, 8:00:02 PM1/17/10
to
Currently, workqueue freezing is implemented by marking the worker
freezeable and calling try_to_freeze() from dispatch loop.
Reimplement it using cwq->limit so that the workqueue is frozen
instead of the worker.

* workqueue_struct->saved_max_active is added which stores the
specified max_active on initialization.

* On freeze, all cwq->max_active's are quenched to zero. Freezing is
complete when nr_active on all cwqs reach zero.

* On thaw, all cwq->max_active's are restored to wq->saved_max_active
and the worklist is repopulated.

This new implementation allows having single shared pool of workers
per cpu.

Signed-off-by: Tejun Heo <t...@kernel.org>
---

include/linux/workqueue.h | 7 ++
kernel/power/process.c | 21 +++++-
kernel/workqueue.c | 163 ++++++++++++++++++++++++++++++++++++++++++---
3 files changed, 179 insertions(+), 12 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 974a232..7a260df 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -331,4 +331,11 @@ static inline long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
#else
long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg);
#endif /* CONFIG_SMP */
+
+#ifdef CONFIG_FREEZER
+extern void freeze_workqueues_begin(void);
+extern bool freeze_workqueues_busy(void);
+extern void thaw_workqueues(void);
+#endif /* CONFIG_FREEZER */
+
#endif
diff --git a/kernel/power/process.c b/kernel/power/process.c
index 5ade1bd..6f89afd 100644
--- a/kernel/power/process.c
+++ b/kernel/power/process.c
@@ -15,6 +15,7 @@
#include <linux/syscalls.h>
#include <linux/freezer.h>
#include <linux/delay.h>
+#include <linux/workqueue.h>

/*
* Timeout for stopping processes
@@ -35,6 +36,7 @@ static int try_to_freeze_tasks(bool sig_only)
struct task_struct *g, *p;
unsigned long end_time;
unsigned int todo;
+ bool wq_busy = false;
struct timeval start, end;
u64 elapsed_csecs64;
unsigned int elapsed_csecs;
@@ -42,6 +44,10 @@ static int try_to_freeze_tasks(bool sig_only)
do_gettimeofday(&start);

end_time = jiffies + TIMEOUT;
+
+ if (!sig_only)
+ freeze_workqueues_begin();
+
while (true) {
todo = 0;
read_lock(&tasklist_lock);
@@ -63,6 +69,12 @@ static int try_to_freeze_tasks(bool sig_only)
todo++;
} while_each_thread(g, p);
read_unlock(&tasklist_lock);
+
+ if (!sig_only) {
+ wq_busy = freeze_workqueues_busy();
+ todo += wq_busy;
+ }
+
if (!todo || time_after(jiffies, end_time))
break;

@@ -86,9 +98,13 @@ static int try_to_freeze_tasks(bool sig_only)
*/
printk("\n");
printk(KERN_ERR "Freezing of tasks failed after %d.%02d seconds "
- "(%d tasks refusing to freeze):\n",
- elapsed_csecs / 100, elapsed_csecs % 100, todo);
+ "(%d tasks refusing to freeze, wq_busy=%d):\n",
+ elapsed_csecs / 100, elapsed_csecs % 100,
+ todo - wq_busy, wq_busy);
show_state();
+
+ thaw_workqueues();
+
read_lock(&tasklist_lock);
do_each_thread(g, p) {
task_lock(p);
@@ -158,6 +174,7 @@ void thaw_processes(void)
oom_killer_enable();

printk("Restarting tasks ... ");
+ thaw_workqueues();
thaw_tasks(true);
thaw_tasks(false);
schedule();
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 0c9c01d..eca3925 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -78,7 +78,7 @@ struct cpu_workqueue_struct {
int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
int nr_active; /* L: nr of active works */
- int max_active; /* I: max active works */
+ int max_active; /* L: max active works */
struct list_head delayed_works; /* L: delayed works */
};

@@ -108,6 +108,7 @@ struct workqueue_struct {
struct list_head flusher_queue; /* F: flush waiters */
struct list_head flusher_overflow; /* F: flush overflow list */

+ int saved_max_active; /* I: saved cwq max_active */
const char *name; /* I: workqueue name */
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
@@ -228,6 +229,7 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
static DEFINE_PER_CPU(struct ida, worker_ida);
+static bool workqueue_freezing; /* W: have wqs started freezing? */

static int worker_thread(void *__worker);

@@ -739,19 +741,13 @@ static int worker_thread(void *__worker)
struct cpu_workqueue_struct *cwq = worker->cwq;
DEFINE_WAIT(wait);

- if (cwq->wq->flags & WQ_FREEZEABLE)
- set_freezable();
-
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
- if (!freezing(current) &&
- !kthread_should_stop() &&
+ if (!kthread_should_stop() &&
list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);

- try_to_freeze();
-
if (kthread_should_stop())
break;

@@ -1504,6 +1500,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
goto err;

wq->flags = flags;
+ wq->saved_max_active = max_active;
mutex_init(&wq->flush_mutex);
atomic_set(&wq->nr_cwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->flusher_queue);
@@ -1548,8 +1545,19 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
wq = NULL;
}

+ /*
+ * workqueue_lock protects global freeze state and workqueues
+ * list. Grab it, set max_active accordingly and add the new
+ * workqueue to workqueues list.
+ */
spin_lock(&workqueue_lock);
+
+ if (workqueue_freezing && wq->flags & WQ_FREEZEABLE)
+ for_each_possible_cpu(cpu)
+ get_cwq(cpu, wq)->max_active = 0;
+
list_add(&wq->list, &workqueues);
+
spin_unlock(&workqueue_lock);

return wq;
@@ -1572,12 +1580,16 @@ void destroy_workqueue(struct workqueue_struct *wq)
{
int cpu;

+ flush_workqueue(wq);
+
+ /*
+ * wq list is used to freeze wq, remove from list after
+ * flushing is complete in case freeze races us.
+ */
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock(&workqueue_lock);

- flush_workqueue(wq);
-
for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
int i;
@@ -1676,6 +1688,137 @@ long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */

+#ifdef CONFIG_FREEZER
+
+/**
+ * freeze_workqueues_begin - begin freezing workqueues
+ *
+ * Start freezing workqueues. After this function returns, all
+ * freezeable workqueues will queue new works to their frozen_works
+ * list instead of the cwq ones.
+ *
+ * CONTEXT:
+ * Grabs and releases workqueue_lock and cwq->lock's.
+ */
+void freeze_workqueues_begin(void)
+{
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+
+ spin_lock(&workqueue_lock);
+
+ BUG_ON(workqueue_freezing);
+ workqueue_freezing = true;
+
+ for_each_possible_cpu(cpu) {
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ spin_lock_irq(&cwq->lock);
+
+ if (wq->flags & WQ_FREEZEABLE)
+ cwq->max_active = 0;
+
+ spin_unlock_irq(&cwq->lock);
+ }
+ }
+
+ spin_unlock(&workqueue_lock);
+}
+
+/**
+ * freeze_workqueues_busy - are freezeable workqueues still busy?
+ *
+ * Check whether freezing is complete. This function must be called
+ * between freeze_workqueues_begin() and thaw_workqueues().
+ *
+ * CONTEXT:
+ * Grabs and releases workqueue_lock.


+ *
+ * RETURNS:

+ * %true if some freezeable workqueues are still busy. %false if
+ * freezing is complete.
+ */
+bool freeze_workqueues_busy(void)
+{
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+ bool busy = false;
+
+ spin_lock(&workqueue_lock);
+
+ BUG_ON(!workqueue_freezing);
+
+ for_each_possible_cpu(cpu) {
+ /*
+ * nr_active is monotonically decreasing. It's safe
+ * to peek without lock.
+ */
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ if (!(wq->flags & WQ_FREEZEABLE))
+ continue;
+
+ BUG_ON(cwq->nr_active < 0);
+ if (cwq->nr_active) {
+ busy = true;
+ goto out_unlock;
+ }
+ }
+ }
+out_unlock:
+ spin_unlock(&workqueue_lock);
+ return busy;
+}
+
+/**
+ * thaw_workqueues - thaw workqueues
+ *
+ * Thaw workqueues. Normal queueing is restored and all collected
+ * frozen works are transferred to their respective cwq worklists.
+ *
+ * CONTEXT:
+ * Grabs and releases workqueue_lock and cwq->lock's.
+ */
+void thaw_workqueues(void)
+{
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+
+ spin_lock(&workqueue_lock);
+
+ if (!workqueue_freezing)
+ goto out_unlock;
+
+ for_each_possible_cpu(cpu) {
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ if (!(wq->flags & WQ_FREEZEABLE))
+ continue;
+
+ spin_lock_irq(&cwq->lock);
+
+ /* restore max_active and repopulate worklist */
+ cwq->max_active = wq->saved_max_active;
+
+ while (!list_empty(&cwq->delayed_works) &&
+ cwq->nr_active < cwq->max_active)
+ cwq_activate_first_delayed(cwq);
+
+ wake_up(&cwq->more_work);
+
+ spin_unlock_irq(&cwq->lock);
+ }
+ }
+
+ workqueue_freezing = false;
+out_unlock:
+ spin_unlock(&workqueue_lock);
+}
+#endif /* CONFIG_FREEZER */
+
void __init init_workqueues(void)
{
unsigned int cpu;

Tejun Heo

unread,
Jan 17, 2010, 8:00:02 PM1/17/10
to
Factor ttwu_activate() and ttwu_woken_up() out of try_to_wake_up().
The factoring out doesn't affect try_to_wake_up() much
code-generation-wise. Depending on configuration options, it ends up
generating the same object code as before or slightly different one
due to different register assignment.

This is to help future implementation of try_to_wake_up_local().

Mike Galbraith suggested rename to ttwu_post_activation() from
ttwu_woken_up() and comment update in try_to_wake_up().

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Peter Zijlstra <pet...@infradead.org>
Cc: Mike Galbraith <efa...@gmx.de>
Cc: Ingo Molnar <mi...@elte.hu>
---
kernel/sched.c | 114 +++++++++++++++++++++++++++++++------------------------
1 files changed, 64 insertions(+), 50 deletions(-)

diff --git a/kernel/sched.c b/kernel/sched.c
index 768d313..7633915 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -2391,11 +2391,67 @@ int select_task_rq(struct task_struct *p, int sd_flags, int wake_flags)
}
#endif

-/***
+static inline void ttwu_activate(struct task_struct *p, struct rq *rq,
+ bool is_sync, bool is_migrate, bool is_local)
+{
+ schedstat_inc(p, se.nr_wakeups);
+ if (is_sync)
+ schedstat_inc(p, se.nr_wakeups_sync);
+ if (is_migrate)
+ schedstat_inc(p, se.nr_wakeups_migrate);
+ if (is_local)
+ schedstat_inc(p, se.nr_wakeups_local);
+ else
+ schedstat_inc(p, se.nr_wakeups_remote);
+
+ activate_task(rq, p, 1);
+
+ /*
+ * Only attribute actual wakeups done by this task.
+ */
+ if (!in_interrupt()) {
+ struct sched_entity *se = &current->se;
+ u64 sample = se->sum_exec_runtime;
+
+ if (se->last_wakeup)
+ sample -= se->last_wakeup;
+ else
+ sample -= se->start_runtime;
+ update_avg(&se->avg_wakeup, sample);
+
+ se->last_wakeup = se->sum_exec_runtime;
+ }
+}
+
+static inline void ttwu_post_activation(struct task_struct *p, struct rq *rq,
+ int wake_flags, bool success)
+{
+ trace_sched_wakeup(rq, p, success);
+ check_preempt_curr(rq, p, wake_flags);
+
+ p->state = TASK_RUNNING;
+#ifdef CONFIG_SMP
+ if (p->sched_class->task_woken)
+ p->sched_class->task_woken(rq, p);
+
+ if (unlikely(rq->idle_stamp)) {
+ u64 delta = rq->clock - rq->idle_stamp;
+ u64 max = 2*sysctl_sched_migration_cost;
+
+ if (delta > max)
+ rq->avg_idle = max;
+ else
+ update_avg(&rq->avg_idle, delta);
+ rq->idle_stamp = 0;
+ }
+#endif
+}
+
+/**
* try_to_wake_up - wake up a thread
- * @p: the to-be-woken-up thread
+ * @p: the thread to be awakened
* @state: the mask of task states that can be woken
- * @sync: do a synchronous wakeup?
+ * @wake_flags: wake modifier flags (WF_*)
*
* Put it on the run-queue if it's not already there. The "current"
* thread is always on the run-queue (except when the actual
@@ -2403,7 +2459,8 @@ int select_task_rq(struct task_struct *p, int sd_flags, int wake_flags)
* the simpler "current->state = TASK_RUNNING" to mark yourself
* runnable without the overhead of this.
*
- * returns failure only if the task is already active.
+ * Returns %true if @p was woken up, %false if it was already running
+ * or @state didn't match @p's state.
*/
static int try_to_wake_up(struct task_struct *p, unsigned int state,
int wake_flags)
@@ -2475,54 +2532,11 @@ static int try_to_wake_up(struct task_struct *p, unsigned int state,

out_activate:
#endif /* CONFIG_SMP */
- schedstat_inc(p, se.nr_wakeups);
- if (wake_flags & WF_SYNC)
- schedstat_inc(p, se.nr_wakeups_sync);
- if (orig_cpu != cpu)
- schedstat_inc(p, se.nr_wakeups_migrate);
- if (cpu == this_cpu)
- schedstat_inc(p, se.nr_wakeups_local);
- else
- schedstat_inc(p, se.nr_wakeups_remote);
- activate_task(rq, p, 1);
+ ttwu_activate(p, rq, wake_flags & WF_SYNC, orig_cpu != cpu,
+ cpu == this_cpu);
success = 1;
-
- /*
- * Only attribute actual wakeups done by this task.
- */
- if (!in_interrupt()) {
- struct sched_entity *se = &current->se;
- u64 sample = se->sum_exec_runtime;
-
- if (se->last_wakeup)
- sample -= se->last_wakeup;
- else
- sample -= se->start_runtime;
- update_avg(&se->avg_wakeup, sample);
-
- se->last_wakeup = se->sum_exec_runtime;
- }
-
out_running:
- trace_sched_wakeup(rq, p, success);
- check_preempt_curr(rq, p, wake_flags);
-
- p->state = TASK_RUNNING;
-#ifdef CONFIG_SMP
- if (p->sched_class->task_woken)
- p->sched_class->task_woken(rq, p);
-
- if (unlikely(rq->idle_stamp)) {
- u64 delta = rq->clock - rq->idle_stamp;
- u64 max = 2*sysctl_sched_migration_cost;
-
- if (delta > max)
- rq->avg_idle = max;
- else
- update_avg(&rq->avg_idle, delta);
- rq->idle_stamp = 0;
- }
-#endif
+ ttwu_post_activation(p, rq, wake_flags, success);
out:
task_rq_unlock(rq, &flags);
put_cpu();

Tejun Heo

unread,
Jan 17, 2010, 8:00:03 PM1/17/10
to
Implement worker states. After created, a worker is STARTED. While a
worker isn't processing a work, it's IDLE and chained on
gcwq->idle_list. While processing a work, a worker is BUSY and
chained on gcwq->busy_hash. Also, gcwq now counts the number of all
workers and idle ones.

worker_thread() is restructured to reflect state transitions.
cwq->more_work is removed and waking up a worker makes it check for
events. A worker is killed by setting DIE flag while it's IDLE and
waking it up.

This gives gcwq better visibility of what's going on and allows it to
find out whether a work is executing quickly which is necessary to
have multiple workers processing the same cwq.

Signed-off-by: Tejun Heo <t...@kernel.org>
---

kernel/workqueue.c | 207 ++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 170 insertions(+), 37 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 91c924c..104f72f 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -35,6 +35,17 @@
#include <linux/lockdep.h>
#include <linux/idr.h>

+enum {
+ /* worker flags */
+ WORKER_STARTED = 1 << 0, /* started */
+ WORKER_DIE = 1 << 1, /* die die die */
+ WORKER_IDLE = 1 << 2, /* is idle */
+
+ BUSY_WORKER_HASH_ORDER = 5, /* 32 pointers */
+ BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,
+ BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1,
+};
+
/*
* Structure fields follow one of the following exclusion rules.
*
@@ -51,11 +62,18 @@ struct global_cwq;
struct cpu_workqueue_struct;

struct worker {
+ /* on idle list while idle, on busy hash table while busy */
+ union {
+ struct list_head entry; /* L: while idle */
+ struct hlist_node hentry; /* L: while busy */
+ };
+
struct work_struct *current_work; /* L: work being processed */
struct list_head scheduled; /* L: scheduled works */
struct task_struct *task; /* I: worker task */
struct global_cwq *gcwq; /* I: the associated gcwq */
struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
+ unsigned int flags; /* L: flags */
int id; /* I: worker id */
};

@@ -65,6 +83,15 @@ struct worker {
struct global_cwq {
spinlock_t lock; /* the gcwq lock */
unsigned int cpu; /* I: the associated cpu */
+
+ int nr_workers; /* L: total number of workers */
+ int nr_idle; /* L: currently idle ones */
+
+ /* workers are chained either in the idle_list or busy_hash */
+ struct list_head idle_list; /* L: list of idle workers */
+ struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
+ /* L: hash of busy workers */
+
struct ida worker_ida; /* L: for worker IDs */
} ____cacheline_aligned_in_smp;

@@ -77,7 +104,6 @@ struct global_cwq {
struct cpu_workqueue_struct {
struct global_cwq *gcwq; /* I: the associated gcwq */
struct list_head worklist;
- wait_queue_head_t more_work;
struct worker *worker;
struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */
@@ -300,6 +326,33 @@ static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
}

/**
+ * busy_worker_head - return the busy hash head for a work
+ * @gcwq: gcwq of interest
+ * @work: work to be hashed
+ *
+ * Return hash head of @gcwq for @work.


+ *
+ * CONTEXT:

+ * spin_lock_irq(gcwq->lock).


+ *
+ * RETURNS:

+ * Pointer to the hash head.
+ */
+static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
+ struct work_struct *work)
+{
+ const int base_shift = ilog2(sizeof(struct work_struct));
+ unsigned long v = (unsigned long)work;
+
+ /* simple shift and fold hash, do we need something better? */
+ v >>= base_shift;
+ v += v >> BUSY_WORKER_HASH_ORDER;
+ v &= BUSY_WORKER_HASH_MASK;
+
+ return &gcwq->busy_hash[v];
+}
+
+/**
* insert_work - insert a work into cwq
* @cwq: cwq @work belongs to
* @work: work to insert
@@ -325,7 +378,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
smp_wmb();

list_add_tail(&work->entry, head);
- wake_up(&cwq->more_work);
+ wake_up_process(cwq->worker->task);
}

static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
@@ -463,13 +516,59 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);

+/**
+ * worker_enter_idle - enter idle state
+ * @worker: worker which is entering idle state
+ *
+ * @worker is entering idle state. Update stats and idle timer if
+ * necessary.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void worker_enter_idle(struct worker *worker)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+
+ BUG_ON(worker->flags & WORKER_IDLE);
+ BUG_ON(!list_empty(&worker->entry) &&
+ (worker->hentry.next || worker->hentry.pprev));
+
+ worker->flags |= WORKER_IDLE;
+ gcwq->nr_idle++;
+
+ /* idle_list is LIFO */
+ list_add(&worker->entry, &gcwq->idle_list);
+}
+
+/**
+ * worker_leave_idle - leave idle state
+ * @worker: worker which is leaving idle state
+ *
+ * @worker is leaving idle state. Update stats.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void worker_leave_idle(struct worker *worker)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+
+ BUG_ON(!(worker->flags & WORKER_IDLE));
+ worker->flags &= ~WORKER_IDLE;
+ gcwq->nr_idle--;
+ list_del_init(&worker->entry);
+}
+
static struct worker *alloc_worker(void)
{
struct worker *worker;

worker = kzalloc(sizeof(*worker), GFP_KERNEL);
- if (worker)
+ if (worker) {
+ INIT_LIST_HEAD(&worker->entry);
INIT_LIST_HEAD(&worker->scheduled);
+ }
return worker;
}

@@ -534,13 +633,16 @@ fail:
* start_worker - start a newly created worker
* @worker: worker to start
*
- * Start @worker.
+ * Make the gcwq aware of @worker and start it.
*
* CONTEXT:
* spin_lock_irq(gcwq->lock).
*/
static void start_worker(struct worker *worker)
{
+ worker->flags |= WORKER_STARTED;
+ worker->gcwq->nr_workers++;
+ worker_enter_idle(worker);
wake_up_process(worker->task);
}

@@ -548,7 +650,10 @@ static void start_worker(struct worker *worker)
* destroy_worker - destroy a workqueue worker
* @worker: worker to be destroyed
*
- * Destroy @worker.
+ * Destroy @worker and adjust @gcwq stats accordingly.


+ *
+ * CONTEXT:

+ * spin_lock_irq(gcwq->lock) which is released and regrabbed.
*/
static void destroy_worker(struct worker *worker)
{
@@ -559,12 +664,21 @@ static void destroy_worker(struct worker *worker)
BUG_ON(worker->current_work);
BUG_ON(!list_empty(&worker->scheduled));

+ if (worker->flags & WORKER_STARTED)
+ gcwq->nr_workers--;
+ if (worker->flags & WORKER_IDLE)
+ gcwq->nr_idle--;
+
+ list_del_init(&worker->entry);
+ worker->flags |= WORKER_DIE;
+
+ spin_unlock_irq(&gcwq->lock);
+
kthread_stop(worker->task);
kfree(worker);

spin_lock_irq(&gcwq->lock);
ida_remove(&gcwq->worker_ida, id);
- spin_unlock_irq(&gcwq->lock);
}

/**
@@ -680,6 +794,7 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
{


struct cpu_workqueue_struct *cwq = worker->cwq;

struct global_cwq *gcwq = cwq->gcwq;
+ struct hlist_head *bwh = busy_worker_head(gcwq, work);
work_func_t f = work->func;
int work_color;
#ifdef CONFIG_LOCKDEP
@@ -694,6 +809,7 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
#endif
/* claim and process */
debug_work_deactivate(work);
+ hlist_add_head(&worker->hentry, bwh);
worker->current_work = work;
work_color = work_flags_to_color(*work_data_bits(work));
list_del_init(&work->entry);
@@ -721,6 +837,7 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
spin_lock_irq(&gcwq->lock);

/* we're done with it, release */
+ hlist_del_init(&worker->hentry);
worker->current_work = NULL;
cwq_dec_nr_in_flight(cwq, work_color);
}
@@ -757,42 +874,52 @@ static int worker_thread(void *__worker)
struct worker *worker = __worker;
struct global_cwq *gcwq = worker->gcwq;


struct cpu_workqueue_struct *cwq = worker->cwq;

- DEFINE_WAIT(wait);

- for (;;) {
- prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
- if (!kthread_should_stop() &&
- list_empty(&cwq->worklist))
- schedule();
- finish_wait(&cwq->more_work, &wait);
+woke_up:
+ spin_lock_irq(&gcwq->lock);

- if (kthread_should_stop())
- break;
+ /* DIE can be set only while we're idle, checking here is enough */
+ if (worker->flags & WORKER_DIE) {
+ spin_unlock_irq(&gcwq->lock);
+ return 0;
+ }

- spin_lock_irq(&gcwq->lock);
+ worker_leave_idle(worker);

- while (!list_empty(&cwq->worklist)) {
- struct work_struct *work =
- list_first_entry(&cwq->worklist,
- struct work_struct, entry);
-
- if (likely(!(*work_data_bits(work) &
- WORK_STRUCT_LINKED))) {
- /* optimization path, not strictly necessary */
- process_one_work(worker, work);
- if (unlikely(!list_empty(&worker->scheduled)))
- process_scheduled_works(worker);
- } else {
- move_linked_works(work, &worker->scheduled,
- NULL);
+ /*
+ * ->scheduled list can only be filled while a worker is
+ * preparing to process a work or actually processing it.
+ * Make sure nobody diddled with it while I was sleeping.
+ */
+ BUG_ON(!list_empty(&worker->scheduled));
+
+ while (!list_empty(&cwq->worklist)) {
+ struct work_struct *work =
+ list_first_entry(&cwq->worklist,
+ struct work_struct, entry);
+
+ if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
+ /* optimization path, not strictly necessary */
+ process_one_work(worker, work);
+ if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
- }
+ } else {
+ move_linked_works(work, &worker->scheduled, NULL);
+ process_scheduled_works(worker);
}
-
- spin_unlock_irq(&gcwq->lock);
}

- return 0;
+ /*
+ * gcwq->lock is held and there's no work to process, sleep.
+ * Workers are woken up only while holding gcwq->lock, so
+ * setting the current state before releasing gcwq->lock is
+ * enough to prevent losing any event.
+ */
+ worker_enter_idle(worker);
+ __set_current_state(TASK_INTERRUPTIBLE);
+ spin_unlock_irq(&gcwq->lock);
+ schedule();
+ goto woke_up;
}

struct wq_barrier {
@@ -1551,7 +1678,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
cwq->max_active = max_active;
INIT_LIST_HEAD(&cwq->worklist);
INIT_LIST_HEAD(&cwq->delayed_works);
- init_waitqueue_head(&cwq->more_work);

if (failed)
continue;
@@ -1602,7 +1728,7 @@ EXPORT_SYMBOL_GPL(__create_workqueue_key);
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
- int cpu;
+ unsigned int cpu;

flush_workqueue(wq);

@@ -1619,8 +1745,10 @@ void destroy_workqueue(struct workqueue_struct *wq)
int i;

if (cwq->worker) {
+ spin_lock_irq(&cwq->gcwq->lock);
destroy_worker(cwq->worker);
cwq->worker = NULL;
+ spin_unlock_irq(&cwq->gcwq->lock);
}

for (i = 0; i < WORK_NR_COLORS; i++)
@@ -1835,7 +1963,7 @@ void thaw_workqueues(void)
cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq);

- wake_up(&cwq->more_work);
+ wake_up_process(cwq->worker->task);
}

spin_unlock_irq(&gcwq->lock);
@@ -1850,6 +1978,7 @@ out_unlock:


void __init init_workqueues(void)
{
unsigned int cpu;

+ int i;

/*
* cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
@@ -1869,6 +1998,10 @@ void __init init_workqueues(void)
spin_lock_init(&gcwq->lock);
gcwq->cpu = cpu;

+ INIT_LIST_HEAD(&gcwq->idle_list);
+ for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
+ INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
+
ida_init(&gcwq->worker_ida);

Tejun Heo

unread,
Jan 17, 2010, 8:00:03 PM1/17/10
to
Reimplement CPU hotplugging support using trustee thread. On CPU
down, a trustee thread is created and each step of CPU down is
executed by the trustee and workqueue_cpu_callback() simply drives and
waits for trustee state transitions.

CPU down operation no longer waits for works to be drained but trustee
sticks around till all pending works have been completed. If CPU is
brought back up while works are still draining,
workqueue_cpu_callback() tells trustee to step down and rebinds
workers to the cpu.

As it's difficult to tell whether cwqs are empty if it's freezing or
frozen, trustee doesn't consider draining to be complete while a gcwq
is freezing or frozen (tracked by new GCWQ_FREEZING flag). Also,
workers which get unbound from their cpu are marked with WORKER_ROGUE.

Trustee based implementation doesn't bring any new feature at this
point but it will be used to manage worker pool when dynamic shared
worker pool is implemented.

Signed-off-by: Tejun Heo <t...@kernel.org>
---

kernel/workqueue.c | 296 +++++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 281 insertions(+), 15 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 104f72f..265f480 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -36,14 +36,27 @@
#include <linux/idr.h>

enum {
+ /* global_cwq flags */
+ GCWQ_FREEZING = 1 << 3, /* freeze in progress */


+
/* worker flags */

WORKER_STARTED = 1 << 0, /* started */

WORKER_DIE = 1 << 1, /* die die die */

WORKER_IDLE = 1 << 2, /* is idle */

+ WORKER_ROGUE = 1 << 4, /* not bound to any cpu */
+
+ /* gcwq->trustee_state */
+ TRUSTEE_START = 0, /* start */
+ TRUSTEE_IN_CHARGE = 1, /* trustee in charge of gcwq */
+ TRUSTEE_BUTCHER = 2, /* butcher workers */
+ TRUSTEE_RELEASE = 3, /* release workers */
+ TRUSTEE_DONE = 4, /* trustee is done */



BUSY_WORKER_HASH_ORDER = 5, /* 32 pointers */

BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,


BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1,
+

+ TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */
};

/*
@@ -83,6 +96,7 @@ struct worker {


struct global_cwq {
spinlock_t lock; /* the gcwq lock */
unsigned int cpu; /* I: the associated cpu */

+ unsigned int flags; /* L: GCWQ_* flags */



int nr_workers; /* L: total number of workers */

int nr_idle; /* L: currently idle ones */

@@ -93,6 +107,10 @@ struct global_cwq {


/* L: hash of busy workers */

struct ida worker_ida; /* L: for worker IDs */

+
+ struct task_struct *trustee; /* L: for gcwq shutdown */
+ unsigned int trustee_state; /* L: trustee state */
+ wait_queue_head_t trustee_wait; /* trustee wait */
} ____cacheline_aligned_in_smp;

/*
@@ -148,6 +166,10 @@ struct workqueue_struct {
#endif
};

+#define for_each_busy_worker(worker, i, pos, gcwq) \


+ for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \

+ hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
+
#ifdef CONFIG_DEBUG_OBJECTS_WORK

static struct debug_obj_descr work_debug_descr;
@@ -539,6 +561,9 @@ static void worker_enter_idle(struct worker *worker)



/* idle_list is LIFO */

list_add(&worker->entry, &gcwq->idle_list);
+

+ if (unlikely(worker->flags & WORKER_ROGUE))
+ wake_up_all(&gcwq->trustee_wait);
}

/**
@@ -615,8 +640,15 @@ static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
if (IS_ERR(worker->task))
goto fail;

+ /*
+ * A rogue worker will become a regular one if CPU comes
+ * online later on. Make sure every worker has
+ * PF_THREAD_BOUND set.
+ */
if (bind)
kthread_bind(worker->task, gcwq->cpu);
+ else
+ worker->task->flags |= PF_THREAD_BOUND;

return worker;
fail:
@@ -1762,34 +1794,259 @@ void destroy_workqueue(struct workqueue_struct *wq)
}
EXPORT_SYMBOL_GPL(destroy_workqueue);

+/*
+ * CPU hotplug.
+ *
+ * CPU hotplug is implemented by allowing cwqs to be detached from
+ * CPU, running with unbound workers and allowing them to be
+ * reattached later if the cpu comes back online. A separate thread
+ * is created to govern cwqs in such state and is called the trustee.
+ *
+ * Trustee states and their descriptions.
+ *
+ * START Command state used on startup. On CPU_DOWN_PREPARE, a
+ * new trustee is started with this state.
+ *
+ * IN_CHARGE Once started, trustee will enter this state after
+ * making all existing workers rogue. DOWN_PREPARE waits
+ * for trustee to enter this state. After reaching
+ * IN_CHARGE, trustee tries to execute the pending
+ * worklist until it's empty and the state is set to
+ * BUTCHER, or the state is set to RELEASE.
+ *
+ * BUTCHER Command state which is set by the cpu callback after
+ * the cpu has went down. Once this state is set trustee
+ * knows that there will be no new works on the worklist
+ * and once the worklist is empty it can proceed to
+ * killing idle workers.
+ *
+ * RELEASE Command state which is set by the cpu callback if the
+ * cpu down has been canceled or it has come online
+ * again. After recognizing this state, trustee stops
+ * trying to drain or butcher and transits to DONE.
+ *
+ * DONE Trustee will enter this state after BUTCHER or RELEASE
+ * is complete.
+ *
+ * trustee CPU draining
+ * took over down complete
+ * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
+ * | | ^
+ * | CPU is back online v return workers |
+ * ----------------> RELEASE --------------
+ */
+
+/**
+ * trustee_wait_event_timeout - timed event wait for trustee
+ * @cond: condition to wait for
+ * @timeout: timeout in jiffies
+ *
+ * wait_event_timeout() for trustee to use. Handles locking and
+ * checks for RELEASE request.


+ *
+ * CONTEXT:

+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. To be used by trustee.


+ *
+ * RETURNS:

+ * Positive indicating left time if @cond is satisfied, 0 if timed
+ * out, -1 if canceled.
+ */
+#define trustee_wait_event_timeout(cond, timeout) ({ \
+ long __ret = (timeout); \
+ while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \
+ __ret) { \
+ spin_unlock_irq(&gcwq->lock); \
+ __wait_event_timeout(gcwq->trustee_wait, (cond) || \
+ (gcwq->trustee_state == TRUSTEE_RELEASE), \
+ __ret); \
+ spin_lock_irq(&gcwq->lock); \
+ } \
+ gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret); \
+})
+
+/**
+ * trustee_wait_event - event wait for trustee
+ * @cond: condition to wait for
+ *
+ * wait_event() for trustee to use. Automatically handles locking and
+ * checks for CANCEL request.


+ *
+ * CONTEXT:

+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. To be used by trustee.


+ *
+ * RETURNS:

+ * 0 if @cond is satisfied, -1 if canceled.
+ */
+#define trustee_wait_event(cond) ({ \
+ long __ret1; \
+ __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
+ __ret1 < 0 ? -1 : 0; \
+})
+
+static bool __cpuinit trustee_unset_rogue(struct worker *worker)


+{
+ struct global_cwq *gcwq = worker->gcwq;

+ int rc;
+
+ if (!(worker->flags & WORKER_ROGUE))
+ return false;
+
+ spin_unlock_irq(&gcwq->lock);
+ rc = __set_cpus_allowed(worker->task, get_cpu_mask(gcwq->cpu), true);
+ BUG_ON(rc);
+ spin_lock_irq(&gcwq->lock);
+ worker->flags &= ~WORKER_ROGUE;
+ return true;
+}
+
+static int __cpuinit trustee_thread(void *__gcwq)
+{
+ struct global_cwq *gcwq = __gcwq;
+ struct worker *worker;
+ struct hlist_node *pos;
+ int i;
+
+ BUG_ON(gcwq->cpu != smp_processor_id());
+
+ spin_lock_irq(&gcwq->lock);
+ /*
+ * Make all multithread workers rogue. Trustee must be bound
+ * to the target cpu and can't be cancelled.
+ */
+ BUG_ON(gcwq->cpu != smp_processor_id());
+
+ list_for_each_entry(worker, &gcwq->idle_list, entry)
+ if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
+ worker->flags |= WORKER_ROGUE;
+
+ for_each_busy_worker(worker, i, pos, gcwq)
+ if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
+ worker->flags |= WORKER_ROGUE;
+
+ /*
+ * We're now in charge. Notify and proceed to drain. We need
+ * to keep the gcwq running during the whole CPU down
+ * procedure as other cpu hotunplug callbacks may need to
+ * flush currently running tasks.
+ */
+ gcwq->trustee_state = TRUSTEE_IN_CHARGE;
+ wake_up_all(&gcwq->trustee_wait);
+
+ /*
+ * The original cpu is in the process of dying and may go away
+ * anytime now. When that happens, we and all workers would
+ * be migrated to other cpus. Try draining any left work.
+ * Note that if the gcwq is frozen, there may be frozen works
+ * in freezeable cwqs. Don't declare completion while frozen.
+ */
+ while (gcwq->nr_workers != gcwq->nr_idle ||
+ gcwq->flags & GCWQ_FREEZING ||
+ gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
+ /* give a breather */
+ if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
+ break;
+ }
+
+ /* notify completion */
+ gcwq->trustee = NULL;
+ gcwq->trustee_state = TRUSTEE_DONE;
+ wake_up_all(&gcwq->trustee_wait);


+ spin_unlock_irq(&gcwq->lock);
+ return 0;
+}

+
+/**
+ * wait_trustee_state - wait for trustee to enter the specified state
+ * @gcwq: gcwq the trustee of interest belongs to
+ * @state: target state to wait for
+ *
+ * Wait for the trustee to reach @state. DONE is already matched.


+ *
+ * CONTEXT:

+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. To be used by cpu_callback.
+ */
+static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
+{
+ if (!(gcwq->trustee_state == state ||
+ gcwq->trustee_state == TRUSTEE_DONE)) {
+ spin_unlock_irq(&gcwq->lock);
+ __wait_event(gcwq->trustee_wait,
+ gcwq->trustee_state == state ||
+ gcwq->trustee_state == TRUSTEE_DONE);
+ spin_lock_irq(&gcwq->lock);
+ }
+}
+
static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
unsigned int cpu = (unsigned long)hcpu;
- struct cpu_workqueue_struct *cwq;
- struct workqueue_struct *wq;
+ struct global_cwq *gcwq = get_gcwq(cpu);
+ struct task_struct *new_trustee = NULL;
+ struct worker *worker;
+ struct hlist_node *pos;
+ int i;

action &= ~CPU_TASKS_FROZEN;

- list_for_each_entry(wq, &workqueues, list) {
- if (wq->flags & WQ_SINGLE_THREAD)
- continue;
-
- cwq = get_cwq(cpu, wq);
+ switch (action) {
+ case CPU_DOWN_PREPARE:
+ new_trustee = kthread_create(trustee_thread, gcwq,
+ "workqueue_trustee/%d\n", cpu);
+ if (IS_ERR(new_trustee))
+ return NOTIFY_BAD;
+ kthread_bind(new_trustee, cpu);
+ }

- switch (action) {
- case CPU_ONLINE:
- __set_cpus_allowed(cwq->worker->task,
- get_cpu_mask(cpu), true);
- break;
+ spin_lock_irq(&gcwq->lock);

- case CPU_POST_DEAD:
- flush_workqueue(wq);
- break;
+ switch (action) {
+ case CPU_DOWN_PREPARE:
+ /* initialize trustee and tell it to acquire the gcwq */
+ BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
+ gcwq->trustee = new_trustee;
+ gcwq->trustee_state = TRUSTEE_START;
+ wake_up_process(gcwq->trustee);
+ wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
+ break;
+
+ case CPU_POST_DEAD:
+ gcwq->trustee_state = TRUSTEE_BUTCHER;
+ break;
+
+ case CPU_DOWN_FAILED:
+ case CPU_ONLINE:
+ if (gcwq->trustee_state != TRUSTEE_DONE) {
+ gcwq->trustee_state = TRUSTEE_RELEASE;
+ wake_up_process(gcwq->trustee);
+ wait_trustee_state(gcwq, TRUSTEE_DONE);
}
+
+ /*
+ * Clear ROGUE from and rebind all multithread
+ * workers. Unsetting ROGUE and rebinding require
+ * dropping gcwq->lock. Restart loop after each
+ * successful release.
+ */
+ recheck:
+ list_for_each_entry(worker, &gcwq->idle_list, entry)
+ if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD) &&
+ trustee_unset_rogue(worker))
+ goto recheck;
+
+ for_each_busy_worker(worker, i, pos, gcwq)
+ if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD) &&
+ trustee_unset_rogue(worker))
+ goto recheck;
+ break;
}

+ spin_unlock_irq(&gcwq->lock);
+
return NOTIFY_OK;
}

@@ -1867,6 +2124,9 @@ void freeze_workqueues_begin(void)

spin_lock_irq(&gcwq->lock);

+ BUG_ON(gcwq->flags & GCWQ_FREEZING);
+ gcwq->flags |= GCWQ_FREEZING;
+
list_for_each_entry(wq, &workqueues, list) {


struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

@@ -1950,6 +2210,9 @@ void thaw_workqueues(void)

spin_lock_irq(&gcwq->lock);

+ BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
+ gcwq->flags &= ~GCWQ_FREEZING;
+
list_for_each_entry(wq, &workqueues, list) {


struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

@@ -2003,6 +2266,9 @@ void __init init_workqueues(void)
INIT_HLIST_HEAD(&gcwq->busy_hash[i]);

ida_init(&gcwq->worker_ida);
+
+ gcwq->trustee_state = TRUSTEE_DONE;
+ init_waitqueue_head(&gcwq->trustee_wait);
}

keventd_wq = create_workqueue("events");

Tejun Heo

unread,
Jan 17, 2010, 8:00:02 PM1/17/10
to
Define WQ_MAX_ACTIVE and create keventd with max_active set to half of
it which means that keventd now can process upto WQ_MAX_ACTIVE / 2 - 1
works concurrently. Unless some combination can result in dependency
loop longer than max_active, deadlock won't happen and thus it's
unnecessary to check whether current_is_keventd() before trying to
schedule a work. Kill current_is_keventd().

(Lockdep annotations are broken. We need lock_map_acquire_read_norecurse())

NOT_SIGNED_OFF_YET: Tejun Heo <t...@kernel.org>
Cc: Ingo Molnar <mi...@elte.hu>
Cc: Thomas Gleixner <tg...@linutronix.de>
Cc: Christoph Lameter <c...@linux-foundation.org>
Cc: Tony Luck <tony...@intel.com>
Cc: Andi Kleen <a...@linux.intel.com>
Cc: Oleg Nesterov <ol...@redhat.com>
---
arch/ia64/kernel/smpboot.c | 2 +-
arch/x86/kernel/smpboot.c | 2 +-
include/linux/workqueue.h | 3 +-
kernel/workqueue.c | 54 +++----------------------------------------
4 files changed, 8 insertions(+), 53 deletions(-)

diff --git a/arch/ia64/kernel/smpboot.c b/arch/ia64/kernel/smpboot.c
index de100aa..3a46feb 100644
--- a/arch/ia64/kernel/smpboot.c
+++ b/arch/ia64/kernel/smpboot.c
@@ -516,7 +516,7 @@ do_boot_cpu (int sapicid, int cpu)
/*
* We can't use kernel_thread since we must avoid to reschedule the child.
*/
- if (!keventd_up() || current_is_keventd())
+ if (!keventd_up())
c_idle.work.func(&c_idle.work);
else {
schedule_work(&c_idle.work);
diff --git a/arch/x86/kernel/smpboot.c b/arch/x86/kernel/smpboot.c
index 678d0b8..93175af 100644
--- a/arch/x86/kernel/smpboot.c
+++ b/arch/x86/kernel/smpboot.c
@@ -724,7 +724,7 @@ static int __cpuinit do_boot_cpu(int apicid, int cpu)
goto do_rest;
}

- if (!keventd_up() || current_is_keventd())
+ if (!keventd_up())
c_idle.work.func(&c_idle.work);
else {
schedule_work(&c_idle.work);
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index adb3080..f43a260 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -214,6 +214,8 @@ enum {
WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */
WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
WQ_RESCUER = 1 << 2, /* has an rescue worker */
+
+ WQ_MAX_ACTIVE = 256, /* I like 256, better ideas? */
};

extern struct workqueue_struct *
@@ -267,7 +269,6 @@ extern int schedule_delayed_work(struct delayed_work *work, unsigned long delay)
extern int schedule_delayed_work_on(int cpu, struct delayed_work *work,
unsigned long delay);
extern int schedule_on_each_cpu(work_func_t func);
-extern int current_is_keventd(void);


extern int keventd_up(void);

extern void init_workqueues(void);

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 9baf7a8..9833774 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -2239,7 +2239,6 @@ EXPORT_SYMBOL(schedule_delayed_work_on);
int schedule_on_each_cpu(work_func_t func)
{
int cpu;
- int orig = -1;
struct work_struct *works;

works = alloc_percpu(struct work_struct);
@@ -2248,23 +2247,12 @@ int schedule_on_each_cpu(work_func_t func)

get_online_cpus();

- /*
- * When running in keventd don't schedule a work item on
- * itself. Can just call directly because the work queue is
- * already bound. This also is faster.
- */
- if (current_is_keventd())
- orig = raw_smp_processor_id();
-
for_each_online_cpu(cpu) {
struct work_struct *work = per_cpu_ptr(works, cpu);

INIT_WORK(work, func);
- if (cpu != orig)
- schedule_work_on(cpu, work);
+ schedule_work_on(cpu, work);
}
- if (orig >= 0)
- func(per_cpu_ptr(works, orig));

for_each_online_cpu(cpu)
flush_work(per_cpu_ptr(works, cpu));
@@ -2311,41 +2299,6 @@ int keventd_up(void)
return keventd_wq != NULL;
}

-int current_is_keventd(void)
-{
- bool found = false;
- unsigned int cpu;
-
- /*
- * There no longer is one-to-one relation between worker and
- * work queue and a worker task might be unbound from its cpu
- * if the cpu was offlined. Match all busy workers. This
- * function will go away once dynamic pool is implemented.
- */
- for_each_possible_cpu(cpu) {
- struct global_cwq *gcwq = get_gcwq(cpu);
- struct worker *worker;
- struct hlist_node *pos;
- unsigned long flags;
- int i;
-
- spin_lock_irqsave(&gcwq->lock, flags);
-
- for_each_busy_worker(worker, i, pos, gcwq) {
- if (worker->task == current) {
- found = true;
- break;
- }
- }
-
- spin_unlock_irqrestore(&gcwq->lock, flags);
- if (found)
- break;
- }
-
- return found;
-}
-
static struct cpu_workqueue_struct *alloc_cwqs(void)
{
const size_t size = sizeof(struct cpu_workqueue_struct);
@@ -2393,7 +2346,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
struct workqueue_struct *wq;
unsigned int cpu;

- max_active = clamp_val(max_active, 1, INT_MAX);
+ WARN_ON(max_active < 1 || max_active > WQ_MAX_ACTIVE);
+ max_active = clamp_val(max_active, 1, WQ_MAX_ACTIVE);

wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
@@ -3136,6 +3090,6 @@ void __init init_workqueues(void)
spin_unlock_irq(&gcwq->lock);
}

- keventd_wq = create_workqueue("events");
+ keventd_wq = __create_workqueue("events", 0, WQ_MAX_ACTIVE);
BUG_ON(!keventd_wq);

Tejun Heo

unread,
Jan 17, 2010, 8:00:03 PM1/17/10
to
A work is linked to the next one by having WORK_STRUCT_LINKED bit set
and these links can be chained. When a linked work is dispatched to a
worker, all linked works are dispatched to the worker's newly added
->scheduled queue and processed back-to-back.

Currently, as there's only single worker per cwq, having linked works
doesn't make any visible behavior difference. This change is to
prepare for multiple shared workers per cpu.

Signed-off-by: Tejun Heo <t...@kernel.org>
---

include/linux/workqueue.h | 2 +
kernel/workqueue.c | 152 ++++++++++++++++++++++++++++++++++++++------
2 files changed, 133 insertions(+), 21 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 316cc48..a6650f1 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -25,9 +25,11 @@ typedef void (*work_func_t)(struct work_struct *work);
enum {
WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */
WORK_STRUCT_STATIC_BIT = 1, /* static initializer (debugobjects) */
+ WORK_STRUCT_LINKED_BIT = 2, /* next work is linked to this one */

WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT,
+ WORK_STRUCT_LINKED = 1 << WORK_STRUCT_LINKED_BIT,

WORK_STRUCT_COLOR_SHIFT = 3, /* color for workqueue flushing */
WORK_STRUCT_COLOR_BITS = 4,
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 17d270b..2ac1624 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -51,6 +51,7 @@ struct cpu_workqueue_struct;

struct worker {


struct work_struct *current_work; /* L: work being processed */

+ struct list_head scheduled; /* L: scheduled works */


struct task_struct *task; /* I: worker task */

struct cpu_workqueue_struct *cwq; /* I: the associated cwq */

int id; /* I: worker id */

@@ -438,6 +439,8 @@ static struct worker *alloc_worker(void)


struct worker *worker;

worker = kzalloc(sizeof(*worker), GFP_KERNEL);

+ if (worker)
+ INIT_LIST_HEAD(&worker->scheduled);
return worker;
}

@@ -523,6 +526,7 @@ static void destroy_worker(struct worker *worker)

/* sanity check frenzy */
BUG_ON(worker->current_work);
+ BUG_ON(!list_empty(&worker->scheduled));

kthread_stop(worker->task);
kfree(worker);
@@ -533,6 +537,48 @@ static void destroy_worker(struct worker *worker)
}

/**
+ * move_linked_works - move linked works to a list
+ * @work: start of series of works to be scheduled
+ * @head: target list to append @work to
+ * @nextp: out paramter for nested worklist walking
+ *
+ * Schedule linked works starting from @work to @head. Work series to
+ * be scheduled starts at @work and includes any consecutive work with
+ * WORK_STRUCT_LINKED set in its predecessor.
+ *
+ * If @nextp is not NULL, it's updated to point to the next work of
+ * the last scheduled work. This allows move_linked_works() to be
+ * nested inside outer list_for_each_entry_safe().


+ *
+ * CONTEXT:

+ * spin_lock_irq(cwq->lock).
+ */
+static void move_linked_works(struct work_struct *work, struct list_head *head,
+ struct work_struct **nextp)
+{
+ struct work_struct *n;
+
+ /*
+ * Linked worklist will always end before the end of the list,
+ * use NULL for list head.
+ */
+ work = list_entry(work->entry.prev, struct work_struct, entry);
+ list_for_each_entry_safe_continue(work, n, NULL, entry) {
+ list_move_tail(&work->entry, head);
+ if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))


+ break;
+ }
+
+ /*

+ * If we're already inside safe list traversal and have moved
+ * multiple works to the scheduled queue, the next position
+ * needs to be updated.
+ */
+ if (nextp)
+ *nextp = n;
+}
+
+/**
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
* @cwq: cwq of interest
* @color: color of work which left the queue
@@ -632,17 +678,25 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
cwq_dec_nr_in_flight(cwq, work_color);
}

-static void run_workqueue(struct worker *worker)
+/**
+ * process_scheduled_works - process scheduled works
+ * @worker: self
+ *
+ * Process all scheduled works. Please note that the scheduled list
+ * may change while processing a work, so this function repeatedly
+ * fetches a work from the top and executes it.


+ *
+ * CONTEXT:

+ * spin_lock_irq(cwq->lock) which may be released and regrabbed
+ * multiple times.
+ */
+static void process_scheduled_works(struct worker *worker)
{
- struct cpu_workqueue_struct *cwq = worker->cwq;
-
- spin_lock_irq(&cwq->lock);
- while (!list_empty(&cwq->worklist)) {
- struct work_struct *work = list_entry(cwq->worklist.next,
+ while (!list_empty(&worker->scheduled)) {
+ struct work_struct *work = list_first_entry(&worker->scheduled,
struct work_struct, entry);
process_one_work(worker, work);
}
- spin_unlock_irq(&cwq->lock);
}

/**
@@ -673,7 +727,27 @@ static int worker_thread(void *__worker)
if (kthread_should_stop())
break;

- run_workqueue(worker);
+ spin_lock_irq(&cwq->lock);


+
+ while (!list_empty(&cwq->worklist)) {
+ struct work_struct *work =
+ list_first_entry(&cwq->worklist,
+ struct work_struct, entry);
+
+ if (likely(!(*work_data_bits(work) &

+ WORK_STRUCT_LINKED))) {


+ /* optimization path, not strictly necessary */
+ process_one_work(worker, work);
+ if (unlikely(!list_empty(&worker->scheduled)))

+ process_scheduled_works(worker);


+ } else {
+ move_linked_works(work, &worker->scheduled,

+ NULL);
+ process_scheduled_works(worker);
+ }
+ }
+
+ spin_unlock_irq(&cwq->lock);
}

return 0;
@@ -694,16 +768,33 @@ static void wq_barrier_func(struct work_struct *work)
* insert_wq_barrier - insert a barrier work
* @cwq: cwq to insert barrier into
* @barr: wq_barrier to insert
- * @head: insertion point
+ * @target: target work to attach @barr to
+ * @worker: worker currently executing @target, NULL if @target is not executing
*
- * Insert barrier @barr into @cwq before @head.
+ * @barr is linked to @target such that @barr is completed only after
+ * @target finishes execution. Please note that the ordering
+ * guarantee is observed only with respect to @target and on the local
+ * cpu.
+ *
+ * Currently, a queued barrier can't be canceled. This is because
+ * try_to_grab_pending() can't determine whether the work to be
+ * grabbed is at the head of the queue and thus can't clear LINKED
+ * flag of the previous work while there must be a valid next work
+ * after a work with LINKED flag set.
+ *
+ * Note that when @worker is non-NULL, @target may be modified
+ * underneath us, so we can't reliably determine cwq from @target.
*
* CONTEXT:
* spin_lock_irq(cwq->lock).
*/
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
- struct wq_barrier *barr, struct list_head *head)
+ struct wq_barrier *barr,
+ struct work_struct *target, struct worker *worker)
{
+ struct list_head *head;
+ unsigned int linked = 0;
+
/*
* debugobject calls are safe here even with cwq->lock locked
* as we know for sure that this will not trigger any of the
@@ -714,8 +805,24 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion(&barr->done);

+ /*
+ * If @target is currently being executed, schedule the
+ * barrier to the worker; otherwise, put it after @target.
+ */
+ if (worker)
+ head = worker->scheduled.next;
+ else {
+ unsigned long *bits = work_data_bits(target);
+
+ head = target->entry.next;
+ /* there can already be other linked works, inherit and set */
+ linked = *bits & WORK_STRUCT_LINKED;
+ *bits |= WORK_STRUCT_LINKED;
+ }
+
debug_work_activate(&barr->work);
- insert_work(cwq, &barr->work, head, work_color_to_flags(WORK_NO_COLOR));
+ insert_work(cwq, &barr->work, head,
+ work_color_to_flags(WORK_NO_COLOR) | linked);
}

/**
@@ -948,8 +1055,8 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
*/
int flush_work(struct work_struct *work)
{
+ struct worker *worker = NULL;
struct cpu_workqueue_struct *cwq;
- struct list_head *prev;
struct wq_barrier barr;

might_sleep();
@@ -969,14 +1076,14 @@ int flush_work(struct work_struct *work)
smp_rmb();
if (unlikely(cwq != get_wq_data(work)))
goto already_gone;
- prev = &work->entry;
} else {
- if (!cwq->worker || cwq->worker->current_work != work)
+ if (cwq->worker && cwq->worker->current_work == work)
+ worker = cwq->worker;
+ if (!worker)
goto already_gone;
- prev = &cwq->worklist;
}
- insert_wq_barrier(cwq, &barr, prev->next);

+ insert_wq_barrier(cwq, &barr, work, worker);
spin_unlock_irq(&cwq->lock);
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
@@ -1033,16 +1140,19 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
struct wq_barrier barr;
- int running = 0;
+ struct worker *worker;

spin_lock_irq(&cwq->lock);
+
+ worker = NULL;
if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
- insert_wq_barrier(cwq, &barr, cwq->worklist.next);
- running = 1;
+ worker = cwq->worker;
+ insert_wq_barrier(cwq, &barr, work, worker);
}
+
spin_unlock_irq(&cwq->lock);

- if (unlikely(running)) {
+ if (unlikely(worker)) {
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);

Tejun Heo

unread,
Jan 17, 2010, 8:00:03 PM1/17/10
to
There is one gcwq (global cwq) per each cpu and all cwqs on an cpu
point to it. A gcwq contains a lock to be used by all cwqs on the cpu
and an ida to give IDs to workers belonging to the cpu.

This patch introduces gcwq, moves worker_ida into gcwq and make all
cwqs on the same cpu use the cpu's gcwq->lock instead of separate
locks. gcwq->ida is now protected by gcwq->lock too.

Signed-off-by: Tejun Heo <t...@kernel.org>
---

kernel/workqueue.c | 156 ++++++++++++++++++++++++++++++++--------------------
1 files changed, 96 insertions(+), 60 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index eca3925..91c924c 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -40,38 +40,45 @@
*
* I: Set during initialization and read-only afterwards.
*
- * L: cwq->lock protected. Access with cwq->lock held.
+ * L: gcwq->lock protected. Access with gcwq->lock held.
*
* F: wq->flush_mutex protected.
*
* W: workqueue_lock protected.
*/

+struct global_cwq;


struct cpu_workqueue_struct;

struct worker {
struct work_struct *current_work; /* L: work being processed */

struct list_head scheduled; /* L: scheduled works */
struct task_struct *task; /* I: worker task */

+ struct global_cwq *gcwq; /* I: the associated gcwq */


struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
int id; /* I: worker id */

};

/*
+ * Global per-cpu workqueue.
+ */
+struct global_cwq {
+ spinlock_t lock; /* the gcwq lock */
+ unsigned int cpu; /* I: the associated cpu */
+ struct ida worker_ida; /* L: for worker IDs */
+} ____cacheline_aligned_in_smp;
+
+/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu). The lower WORK_STRUCT_FLAG_BITS of
* work_struct->data are used for flags and thus cwqs need to be
* aligned at two's power of the number of flag bits.
*/
struct cpu_workqueue_struct {
-
- spinlock_t lock;
-
+ struct global_cwq *gcwq; /* I: the associated gcwq */
struct list_head worklist;
wait_queue_head_t more_work;
- unsigned int cpu;
struct worker *worker;
-


struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */

int flush_color; /* L: flushing color */
@@ -228,13 +235,19 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
/* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
-static DEFINE_PER_CPU(struct ida, worker_ida);


static bool workqueue_freezing; /* W: have wqs started freezing? */

+static DEFINE_PER_CPU(struct global_cwq, global_cwq);
+
static int worker_thread(void *__worker);

static int singlethread_cpu __read_mostly;

+static struct global_cwq *get_gcwq(unsigned int cpu)
+{
+ return &per_cpu(global_cwq, cpu);
+}
+
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
struct workqueue_struct *wq)
{
@@ -296,7 +309,7 @@ static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
* Insert @work into @cwq after @head.
*
* CONTEXT:
- * spin_lock_irq(cwq->lock).
+ * spin_lock_irq(gcwq->lock).
*/
static void insert_work(struct cpu_workqueue_struct *cwq,


struct work_struct *work, struct list_head *head,

@@ -319,12 +332,13 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
+ struct global_cwq *gcwq = cwq->gcwq;
struct list_head *worklist;
unsigned long flags;

debug_work_activate(work);

- spin_lock_irqsave(&cwq->lock, flags);
+ spin_lock_irqsave(&gcwq->lock, flags);
BUG_ON(!list_empty(&work->entry));

cwq->nr_in_flight[cwq->work_color]++;
@@ -337,7 +351,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,

insert_work(cwq, work, worklist, work_color_to_flags(cwq->work_color));

- spin_unlock_irqrestore(&cwq->lock, flags);
+ spin_unlock_irqrestore(&gcwq->lock, flags);
}

/**
@@ -476,39 +490,41 @@ static struct worker *alloc_worker(void)
*/


static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)

{
+ struct global_cwq *gcwq = cwq->gcwq;
int id = -1;


struct worker *worker = NULL;

- spin_lock(&workqueue_lock);
- while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
- spin_unlock(&workqueue_lock);
- if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
+ spin_lock_irq(&gcwq->lock);
+ while (ida_get_new(&gcwq->worker_ida, &id)) {
+ spin_unlock_irq(&gcwq->lock);
+ if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))
goto fail;
- spin_lock(&workqueue_lock);
+ spin_lock_irq(&gcwq->lock);
}
- spin_unlock(&workqueue_lock);
+ spin_unlock_irq(&gcwq->lock);

worker = alloc_worker();
if (!worker)
goto fail;

+ worker->gcwq = gcwq;
worker->cwq = cwq;
worker->id = id;

worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
- cwq->cpu, id);
+ gcwq->cpu, id);


if (IS_ERR(worker->task))
goto fail;

if (bind)
- kthread_bind(worker->task, cwq->cpu);
+ kthread_bind(worker->task, gcwq->cpu);

return worker;
fail:
if (id >= 0) {
- spin_lock(&workqueue_lock);
- ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
- spin_unlock(&workqueue_lock);
+ spin_lock_irq(&gcwq->lock);
+ ida_remove(&gcwq->worker_ida, id);
+ spin_unlock_irq(&gcwq->lock);
}
kfree(worker);
return NULL;
@@ -521,7 +537,7 @@ fail:
* Start @worker.
*
* CONTEXT:
- * spin_lock_irq(cwq->lock).
+ * spin_lock_irq(gcwq->lock).


*/
static void start_worker(struct worker *worker)
{

@@ -536,7 +552,7 @@ static void start_worker(struct worker *worker)
*/


static void destroy_worker(struct worker *worker)

{
- int cpu = worker->cwq->cpu;


+ struct global_cwq *gcwq = worker->gcwq;

int id = worker->id;



/* sanity check frenzy */

@@ -546,9 +562,9 @@ static void destroy_worker(struct worker *worker)
kthread_stop(worker->task);
kfree(worker);

- spin_lock(&workqueue_lock);
- ida_remove(&per_cpu(worker_ida, cpu), id);
- spin_unlock(&workqueue_lock);
+ spin_lock_irq(&gcwq->lock);
+ ida_remove(&gcwq->worker_ida, id);
+ spin_unlock_irq(&gcwq->lock);
}

/**
@@ -566,7 +582,7 @@ static void destroy_worker(struct worker *worker)


* nested inside outer list_for_each_entry_safe().

*
* CONTEXT:
- * spin_lock_irq(cwq->lock).
+ * spin_lock_irq(gcwq->lock).
*/


static void move_linked_works(struct work_struct *work, struct list_head *head,

struct work_struct **nextp)
@@ -611,7 +627,7 @@ static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
* decrement nr_in_flight of its cwq and handle workqueue flushing.
*
* CONTEXT:
- * spin_lock_irq(cwq->lock).
+ * spin_lock_irq(gcwq->lock).
*/
static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
{
@@ -658,11 +674,12 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
* call this function to process a work.
*
* CONTEXT:
- * spin_lock_irq(cwq->lock) which is released and regrabbed.


+ * spin_lock_irq(gcwq->lock) which is released and regrabbed.
*/

static void process_one_work(struct worker *worker, struct work_struct *work)

{


struct cpu_workqueue_struct *cwq = worker->cwq;

+ struct global_cwq *gcwq = cwq->gcwq;


work_func_t f = work->func;
int work_color;
#ifdef CONFIG_LOCKDEP

@@ -681,7 +698,7 @@ static void process_one_work(struct worker *worker, struct work_struct *work)


work_color = work_flags_to_color(*work_data_bits(work));
list_del_init(&work->entry);

- spin_unlock_irq(&cwq->lock);
+ spin_unlock_irq(&gcwq->lock);

BUG_ON(get_wq_data(work) != cwq);
work_clear_pending(work);
@@ -701,7 +718,7 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
dump_stack();
}

- spin_lock_irq(&cwq->lock);
+ spin_lock_irq(&gcwq->lock);



/* we're done with it, release */

worker->current_work = NULL;
@@ -717,7 +734,7 @@ static void process_one_work(struct worker *worker, struct work_struct *work)


* fetches a work from the top and executes it.

*
* CONTEXT:
- * spin_lock_irq(cwq->lock) which may be released and regrabbed
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
* multiple times.
*/
static void process_scheduled_works(struct worker *worker)
@@ -738,6 +755,7 @@ static void process_scheduled_works(struct worker *worker)


static int worker_thread(void *__worker)
{
struct worker *worker = __worker;

+ struct global_cwq *gcwq = worker->gcwq;

struct cpu_workqueue_struct *cwq = worker->cwq;

DEFINE_WAIT(wait);

@@ -751,7 +769,7 @@ static int worker_thread(void *__worker)
if (kthread_should_stop())
break;

- spin_lock_irq(&cwq->lock);
+ spin_lock_irq(&gcwq->lock);

while (!list_empty(&cwq->worklist)) {
struct work_struct *work =
@@ -771,7 +789,7 @@ static int worker_thread(void *__worker)
}
}

- spin_unlock_irq(&cwq->lock);
+ spin_unlock_irq(&gcwq->lock);
}

return 0;
@@ -810,7 +828,7 @@ static void wq_barrier_func(struct work_struct *work)


* underneath us, so we can't reliably determine cwq from @target.
*
* CONTEXT:

- * spin_lock_irq(cwq->lock).
+ * spin_lock_irq(gcwq->lock).


*/
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,

struct wq_barrier *barr,
@@ -820,7 +838,7 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,


unsigned int linked = 0;

/*
- * debugobject calls are safe here even with cwq->lock locked
+ * debugobject calls are safe here even with gcwq->lock locked


* as we know for sure that this will not trigger any of the

* checks and call back into the fixup functions where we
* might deadlock.
@@ -890,8 +908,9 @@ static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,

for_each_possible_cpu(cpu) {


struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

+ struct global_cwq *gcwq = cwq->gcwq;

- spin_lock_irq(&cwq->lock);
+ spin_lock_irq(&gcwq->lock);

if (flush_color >= 0) {
BUG_ON(cwq->flush_color != -1);
@@ -908,7 +927,7 @@ static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
cwq->work_color = work_color;
}

- spin_unlock_irq(&cwq->lock);
+ spin_unlock_irq(&gcwq->lock);
}

return wait;
@@ -1081,17 +1100,19 @@ int flush_work(struct work_struct *work)
{


struct worker *worker = NULL;
struct cpu_workqueue_struct *cwq;

+ struct global_cwq *gcwq;
struct wq_barrier barr;

might_sleep();
cwq = get_wq_data(work);
if (!cwq)
return 0;


+ gcwq = cwq->gcwq;

lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);

- spin_lock_irq(&cwq->lock);
+ spin_lock_irq(&gcwq->lock);
if (!list_empty(&work->entry)) {
/*
* See the comment near try_to_grab_pending()->smp_rmb().
@@ -1108,12 +1129,12 @@ int flush_work(struct work_struct *work)
}

insert_wq_barrier(cwq, &barr, work, worker);
- spin_unlock_irq(&cwq->lock);
+ spin_unlock_irq(&gcwq->lock);
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
return 1;
already_gone:
- spin_unlock_irq(&cwq->lock);
+ spin_unlock_irq(&gcwq->lock);
return 0;
}
EXPORT_SYMBOL_GPL(flush_work);
@@ -1124,6 +1145,7 @@ EXPORT_SYMBOL_GPL(flush_work);
*/
static int try_to_grab_pending(struct work_struct *work)
{
+ struct global_cwq *gcwq;
struct cpu_workqueue_struct *cwq;
int ret = -1;

@@ -1138,8 +1160,9 @@ static int try_to_grab_pending(struct work_struct *work)
cwq = get_wq_data(work);
if (!cwq)
return ret;


+ gcwq = cwq->gcwq;

- spin_lock_irq(&cwq->lock);
+ spin_lock_irq(&gcwq->lock);
if (!list_empty(&work->entry)) {
/*
* This work is queued, but perhaps we locked the wrong cwq.
@@ -1155,7 +1178,7 @@ static int try_to_grab_pending(struct work_struct *work)
ret = 1;
}
}
- spin_unlock_irq(&cwq->lock);
+ spin_unlock_irq(&gcwq->lock);

return ret;
}
@@ -1163,10 +1186,11 @@ static int try_to_grab_pending(struct work_struct *work)


static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{

+ struct global_cwq *gcwq = cwq->gcwq;
struct wq_barrier barr;
struct worker *worker;

- spin_lock_irq(&cwq->lock);
+ spin_lock_irq(&gcwq->lock);



worker = NULL;
if (unlikely(cwq->worker && cwq->worker->current_work == work)) {

@@ -1174,7 +1198,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
insert_wq_barrier(cwq, &barr, work, worker);
}

- spin_unlock_irq(&cwq->lock);
+ spin_unlock_irq(&gcwq->lock);

if (unlikely(worker)) {
wait_for_completion(&barr.done);
@@ -1518,13 +1542,13 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
*/
for_each_possible_cpu(cpu) {


struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

+ struct global_cwq *gcwq = get_gcwq(cpu);

BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
- cwq->cpu = cpu;
+ cwq->gcwq = gcwq;
cwq->wq = wq;
cwq->flush_color = -1;
cwq->max_active = max_active;
- spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist);
INIT_LIST_HEAD(&cwq->delayed_works);
init_waitqueue_head(&cwq->more_work);
@@ -1698,7 +1722,7 @@ EXPORT_SYMBOL_GPL(work_on_cpu);


* list instead of the cwq ones.

*
* CONTEXT:
- * Grabs and releases workqueue_lock and cwq->lock's.
+ * Grabs and releases workqueue_lock and gcwq->lock's.
*/
void freeze_workqueues_begin(void)
{
@@ -1711,16 +1735,18 @@ void freeze_workqueues_begin(void)
workqueue_freezing = true;

for_each_possible_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);


+
+ spin_lock_irq(&gcwq->lock);
+

list_for_each_entry(wq, &workqueues, list) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

- spin_lock_irq(&cwq->lock);
-
if (wq->flags & WQ_FREEZEABLE)
cwq->max_active = 0;
-
- spin_unlock_irq(&cwq->lock);
}
+
+ spin_unlock_irq(&gcwq->lock);
}

spin_unlock(&workqueue_lock);
@@ -1779,7 +1805,7 @@ out_unlock:


* frozen works are transferred to their respective cwq worklists.

*
* CONTEXT:
- * Grabs and releases workqueue_lock and cwq->lock's.
+ * Grabs and releases workqueue_lock and gcwq->lock's.
*/
void thaw_workqueues(void)
{
@@ -1792,14 +1818,16 @@ void thaw_workqueues(void)
goto out_unlock;

for_each_possible_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);


+
+ spin_lock_irq(&gcwq->lock);
+

list_for_each_entry(wq, &workqueues, list) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

if (!(wq->flags & WQ_FREEZEABLE))

continue;

- spin_lock_irq(&cwq->lock);
-


/* restore max_active and repopulate worklist */

cwq->max_active = wq->saved_max_active;

@@ -1808,9 +1836,9 @@ void thaw_workqueues(void)
cwq_activate_first_delayed(cwq);

wake_up(&cwq->more_work);
-
- spin_unlock_irq(&cwq->lock);
}
+
+ spin_unlock_irq(&gcwq->lock);
}

workqueue_freezing = false;
@@ -1831,11 +1859,19 @@ void __init init_workqueues(void)
BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
__alignof__(unsigned long long));

- for_each_possible_cpu(cpu)
- ida_init(&per_cpu(worker_ida, cpu));
-
singlethread_cpu = cpumask_first(cpu_possible_mask);
hotcpu_notifier(workqueue_cpu_callback, 0);
+
+ /* initialize gcwqs */
+ for_each_possible_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);
+
+ spin_lock_init(&gcwq->lock);
+ gcwq->cpu = cpu;
+
+ ida_init(&gcwq->worker_ida);
+ }
+


keventd_wq = create_workqueue("events");

BUG_ON(!keventd_wq);

Tejun Heo

unread,
Jan 17, 2010, 8:00:02 PM1/17/10
to
Now that cmwq can handle high concurrency, there's no reason to
implement separate thread pool for async. Introduce alternative
implementation based on workqueue.

The new implementation uses two workqueues - async_wq and
async_ordered_wq. The former multithreaded and the latter
singlethreaded. async_call() schedules unordered asynchronous
excution on async_wq. async_call_ordered() schedules ordered excution
on async_ordered_wq. Functions scheduled using the ordered variant
are guaranteed to be excecuted only after all async excutions
scheduled previously have finished.

This patch doesn't convert any existing user.

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Arjan van de Ven <ar...@infradead.org>
---
drivers/base/core.c | 1 +
drivers/base/dd.c | 1 +
include/linux/async.h | 6 ++
init/do_mounts.c | 1 +
init/main.c | 1 +
kernel/async.c | 147 ++++++++++++++++++++++++++++++++++++++++++++++++
kernel/irq/autoprobe.c | 1 +
kernel/module.c | 2 +
8 files changed, 160 insertions(+), 0 deletions(-)

diff --git a/drivers/base/core.c b/drivers/base/core.c
index 2820257..14774c9 100644
--- a/drivers/base/core.c
+++ b/drivers/base/core.c
@@ -1744,4 +1744,5 @@ void device_shutdown(void)
}
}
async_synchronize_full();
+ async_barrier();
}
diff --git a/drivers/base/dd.c b/drivers/base/dd.c
index ee95c76..5c9c923 100644
--- a/drivers/base/dd.c
+++ b/drivers/base/dd.c
@@ -179,6 +179,7 @@ void wait_for_device_probe(void)
/* wait for the known devices to complete their probing */
wait_event(probe_waitqueue, atomic_read(&probe_count) == 0);
async_synchronize_full();
+ async_barrier();
}
EXPORT_SYMBOL_GPL(wait_for_device_probe);

diff --git a/include/linux/async.h b/include/linux/async.h
index 68a9530..49658dc 100644
--- a/include/linux/async.h
+++ b/include/linux/async.h
@@ -12,6 +12,7 @@

#include <linux/types.h>
#include <linux/list.h>
+#include <linux/workqueue.h>

typedef u64 async_cookie_t;
typedef void (async_func_ptr) (void *data, async_cookie_t cookie);
@@ -25,3 +26,8 @@ extern void async_synchronize_cookie(async_cookie_t cookie);
extern void async_synchronize_cookie_domain(async_cookie_t cookie,
struct list_head *list);

+typedef void (*async_func_t)(void *data);
+
+extern bool async_call(async_func_t func, void *data);
+extern bool async_call_ordered(async_func_t func, void *data);
+extern void async_barrier(void);
diff --git a/init/do_mounts.c b/init/do_mounts.c
index bb008d0..608ac17 100644
--- a/init/do_mounts.c
+++ b/init/do_mounts.c
@@ -406,6 +406,7 @@ void __init prepare_namespace(void)
(ROOT_DEV = name_to_dev_t(saved_root_name)) == 0)
msleep(100);
async_synchronize_full();
+ async_barrier();
}

is_floppy = MAJOR(ROOT_DEV) == FLOPPY_MAJOR;
diff --git a/init/main.c b/init/main.c
index adb09f8..e35dfdd 100644
--- a/init/main.c
+++ b/init/main.c
@@ -802,6 +802,7 @@ static noinline int init_post(void)
{
/* need to finish all async __init code before freeing the memory */
async_synchronize_full();
+ async_barrier();
free_initmem();
unlock_kernel();
mark_rodata_ro();
diff --git a/kernel/async.c b/kernel/async.c
index 27235f5..4cd52bc 100644
--- a/kernel/async.c
+++ b/kernel/async.c
@@ -395,3 +395,150 @@ static int __init async_init(void)
}

core_initcall(async_init);
+
+struct async_ent {
+ struct work_struct work;
+ async_func_t func;
+ void *data;
+ bool ordered;
+};
+
+static struct workqueue_struct *async_wq;
+static struct workqueue_struct *async_ordered_wq;
+
+static void async_work_func(struct work_struct *work)
+{
+ struct async_ent *ent = container_of(work, struct async_ent, work);
+ ktime_t calltime, delta, rettime;
+
+ if (initcall_debug && system_state == SYSTEM_BOOTING) {
+ printk("calling %pF @ %i\n",
+ ent->func, task_pid_nr(current));
+ calltime = ktime_get();
+ }
+
+ if (ent->ordered)
+ flush_workqueue(async_wq);
+
+ ent->func(ent->data);
+
+ if (initcall_debug && system_state == SYSTEM_BOOTING) {
+ rettime = ktime_get();
+ delta = ktime_sub(rettime, calltime);
+ printk("initcall %pF returned 0 after %lld usecs\n",
+ ent->func, (long long)ktime_to_ns(delta) >> 10);
+ }
+}
+
+static bool __async_call(async_func_t func, void *data, bool ordered)
+{
+ struct async_ent *ent;
+
+ ent = kzalloc(sizeof(struct async_ent), GFP_ATOMIC);
+ if (!ent) {
+ kfree(ent);
+ if (ordered) {
+ flush_workqueue(async_wq);
+ flush_workqueue(async_ordered_wq);
+ }
+ func(data);


+ return false;
+ }
+

+ ent->func = func;
+ ent->data = data;
+ ent->ordered = ordered;
+ /*
+ * Use separate INIT_WORK for sync and async so that they end
+ * up with different lockdep keys.
+ */
+ if (ordered) {
+ INIT_WORK(&ent->work, async_work_func);
+ queue_work(async_ordered_wq, &ent->work);
+ } else {
+ INIT_WORK(&ent->work, async_work_func);
+ queue_work(async_wq, &ent->work);
+ }


+ return true;
+}
+

+/**
+ * async_call - schedule a function for asynchronous execution
+ * @func: function to execute asynchronously
+ * @data: data pointer to pass to the function
+ *
+ * Schedule @func(@data) for asynchronous execution. The function
+ * might be called directly if memory allocation fails.


+ *
+ * CONTEXT:

+ * Don't care but keep in mind that @func may be executed directly.


+ *
+ * RETURNS:

+ * %true if async execution is scheduled, %false if executed locally.
+ */
+bool async_call(async_func_t func, void *data)
+{
+ return __async_call(func, data, false);
+}
+EXPORT_SYMBOL_GPL(async_call);
+
+/**
+ * async_call_ordered - schedule ordered asynchronous execution
+ * @func: function to execute asynchronously
+ * @data: data pointer to pass to the function
+ *
+ * Schedule @func(data) for ordered asynchronous excution. It will be
+ * executed only after all async functions scheduled upto this point
+ * have finished.


+ *
+ * CONTEXT:

+ * Might sleep.


+ *
+ * RETURNS:

+ * %true if async execution is scheduled, %false if executed locally.
+ */
+bool async_call_ordered(async_func_t func, void *data)
+{
+ might_sleep();
+ return __async_call(func, data, true);
+}
+EXPORT_SYMBOL_GPL(async_call_ordered);
+
+/**
+ * async_barrier - asynchronous execution barrier
+ *
+ * Wait till all currently scheduled async executions are finished.


+ *
+ * CONTEXT:

+ * Might sleep.
+ */
+void async_barrier(void)
+{
+ ktime_t starttime, delta, endtime;
+
+ if (initcall_debug && system_state == SYSTEM_BOOTING) {
+ printk("async_waiting @ %i\n", task_pid_nr(current));
+ starttime = ktime_get();
+ }
+
+ flush_workqueue(async_wq);
+ flush_workqueue(async_ordered_wq);
+
+ if (initcall_debug && system_state == SYSTEM_BOOTING) {
+ endtime = ktime_get();
+ delta = ktime_sub(endtime, starttime);
+ printk("async_continuing @ %i after %lli usec\n",
+ task_pid_nr(current),
+ (long long)ktime_to_ns(delta) >> 10);
+ }
+}
+EXPORT_SYMBOL_GPL(async_barrier);
+
+static int __init init_async(void)
+{
+ async_wq = __create_workqueue("async", 0, WQ_MAX_ACTIVE);
+ async_ordered_wq = create_singlethread_workqueue("async_ordered");
+ BUG_ON(!async_wq || !async_ordered_wq);
+ return 0;
+}
+core_initcall(init_async);
diff --git a/kernel/irq/autoprobe.c b/kernel/irq/autoprobe.c
index 2295a31..39188cd 100644
--- a/kernel/irq/autoprobe.c
+++ b/kernel/irq/autoprobe.c
@@ -39,6 +39,7 @@ unsigned long probe_irq_on(void)
* quiesce the kernel, or at least the asynchronous portion
*/
async_synchronize_full();
+ async_barrier();
mutex_lock(&probing_active);
/*
* something may have generated an irq long ago and we want to
diff --git a/kernel/module.c b/kernel/module.c
index f82386b..623a9b6 100644
--- a/kernel/module.c
+++ b/kernel/module.c
@@ -717,6 +717,7 @@ SYSCALL_DEFINE2(delete_module, const char __user *, name_user,
blocking_notifier_call_chain(&module_notify_list,
MODULE_STATE_GOING, mod);
async_synchronize_full();
+ async_barrier();
mutex_lock(&module_mutex);
/* Store the name of the last unloaded module for diagnostic purposes */
strlcpy(last_unloaded_module, mod->name, sizeof(last_unloaded_module));
@@ -2494,6 +2495,7 @@ SYSCALL_DEFINE3(init_module, void __user *, umod,

/* We need to finish all async code before the module init sequence is done */
async_synchronize_full();
+ async_barrier();

mutex_lock(&module_mutex);
/* Drop initial reference. */

Tejun Heo

unread,
Jan 17, 2010, 8:00:02 PM1/17/10
to
work->data field is used for two purposes. It points to cwq it's
queued on and the lower bits are used for flags. Currently, two bits
are reserved which is always safe as 4 byte alignment is guaranteed on
every architecture. However, future changes will need more flag bits.

On SMP, the percpu allocator is capable of honoring larger alignment
(there are other users which depend on it) and larger alignment works
just fine. On UP, percpu allocator is a thin wrapper around
kzalloc/kfree() and don't honor alignment request.

This patch introduces WORK_STRUCT_FLAG_BITS and implements
alloc/free_cwqs() which guarantees (1 << WORK_STRUCT_FLAG_BITS)
alignment both on SMP and UP. On SMP, simply wrapping percpu
allocator is enouhg. On UP, extra space is allocated so that cwq can
be aligned and the original pointer can be stored after it which is
used in the free path.

While at it, as cwqs are now forced aligned, make sure the resulting
alignment is at least equal to or larger than that of long long.

Alignment problem on UP is reported by Michal Simek.

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Christoph Lameter <c...@linux-foundation.org>
Cc: Ingo Molnar <mi...@elte.hu>
Reported-by: Michal Simek <michal...@petalogix.com>
---
include/linux/workqueue.h | 4 ++-
kernel/workqueue.c | 59 +++++++++++++++++++++++++++++++++++++++++----
2 files changed, 57 insertions(+), 6 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index e51b5dc..011738a 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -29,7 +29,9 @@ enum {


WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT,

- WORK_STRUCT_FLAG_MASK = 3UL,
+ WORK_STRUCT_FLAG_BITS = 2,
+
+ WORK_STRUCT_FLAG_MASK = (1UL << WORK_STRUCT_FLAG_BITS) - 1,
WORK_STRUCT_WQ_DATA_MASK = ~WORK_STRUCT_FLAG_MASK,
};

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index d29e069..14edfcd 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -46,7 +46,9 @@



/*
* The per-CPU workqueue (if single thread, we always use the first

- * possible cpu).
+ * possible cpu). The lower WORK_STRUCT_FLAG_BITS of
+ * work_struct->data are used for flags and thus cwqs need to be
+ * aligned at two's power of the number of flag bits.
*/
struct cpu_workqueue_struct {

@@ -58,7 +60,7 @@ struct cpu_workqueue_struct {



struct workqueue_struct *wq; /* I: the owning workqueue */

struct task_struct *thread;
-} ____cacheline_aligned;
+};

/*
* The externally visible workqueue abstraction is an array of
@@ -930,6 +932,44 @@ int current_is_keventd(void)

}

+static struct cpu_workqueue_struct *alloc_cwqs(void)
+{
+ const size_t size = sizeof(struct cpu_workqueue_struct);
+ const size_t align = 1 << WORK_STRUCT_FLAG_BITS;
+ struct cpu_workqueue_struct *cwqs;
+#ifndef CONFIG_SMP
+ void *ptr;
+
+ /*
+ * On UP, percpu allocator doesn't honor alignment parameter
+ * and simply uses arch-dependent default. Allocate enough
+ * room to align cwq and put an extra pointer at the end
+ * pointing back to the originally allocated pointer which
+ * will be used for free.
+ */
+ ptr = __alloc_percpu(size + align + sizeof(void *), 1);
+ cwqs = PTR_ALIGN(ptr, align);
+ *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
+#else
+ /* On SMP, percpu allocator can do it itself */
+ cwqs = __alloc_percpu(size, align);
+#endif
+ /* just in case, make sure it's actually aligned */
+ BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
+ return cwqs;
+}
+
+static void free_cwqs(struct cpu_workqueue_struct *cwqs)
+{
+#ifndef CONFIG_SMP
+ /* on UP, the pointer to free is stored right after the cwq */
+ if (cwqs)
+ free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
+#else
+ free_percpu(cwqs);
+#endif
+}
+
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct workqueue_struct *wq = cwq->wq;
@@ -975,7 +1015,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
if (!wq)
goto err;

- wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
+ wq->cpu_wq = alloc_cwqs();
if (!wq->cpu_wq)
goto err;

@@ -994,6 +1034,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,


for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);

+ BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
cwq->wq = wq;
spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist);
@@ -1021,7 +1062,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
return wq;
err:
if (wq) {
- free_percpu(wq->cpu_wq);
+ free_cwqs(wq->cpu_wq);
kfree(wq);
}
return NULL;
@@ -1072,7 +1113,7 @@ void destroy_workqueue(struct workqueue_struct *wq)
for_each_possible_cpu(cpu)
cleanup_workqueue_thread(get_cwq(cpu, wq));

- free_percpu(wq->cpu_wq);
+ free_cwqs(wq->cpu_wq);
kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);
@@ -1159,6 +1200,14 @@ EXPORT_SYMBOL_GPL(work_on_cpu);

void __init init_workqueues(void)
{
+ /*
+ * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
+ * Make sure that the alignment isn't lower than that of
+ * unsigned long long.
+ */
+ BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
+ __alignof__(unsigned long long));
+


singlethread_cpu = cpumask_first(cpu_possible_mask);
hotcpu_notifier(workqueue_cpu_callback, 0);

keventd_wq = create_workqueue("events");

Tejun Heo

unread,
Jan 17, 2010, 8:00:03 PM1/17/10
to
Workqueue can now handle high concurrency. Use system_long_wq instead
of slow-work.

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Steven Whitehouse <swhi...@redhat.com>
---
fs/gfs2/Kconfig | 1 -
fs/gfs2/incore.h | 3 +-
fs/gfs2/main.c | 9 +-------
fs/gfs2/ops_fstype.c | 8 +++---
fs/gfs2/recovery.c | 52 ++++++++++++++++++-------------------------------
fs/gfs2/recovery.h | 4 +-
fs/gfs2/sys.c | 3 +-
7 files changed, 29 insertions(+), 51 deletions(-)

diff --git a/fs/gfs2/Kconfig b/fs/gfs2/Kconfig
index 4dcddf8..f51f1bb 100644
--- a/fs/gfs2/Kconfig
+++ b/fs/gfs2/Kconfig
@@ -7,7 +7,6 @@ config GFS2_FS
select IP_SCTP if DLM_SCTP
select FS_POSIX_ACL
select CRC32
- select SLOW_WORK
select QUOTA
select QUOTACTL
help
diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h
index 4792200..b99d5be 100644
--- a/fs/gfs2/incore.h
+++ b/fs/gfs2/incore.h
@@ -12,7 +12,6 @@

#include <linux/fs.h>
#include <linux/workqueue.h>
-#include <linux/slow-work.h>
#include <linux/dlm.h>
#include <linux/buffer_head.h>

@@ -383,7 +382,7 @@ struct gfs2_journal_extent {
struct gfs2_jdesc {
struct list_head jd_list;
struct list_head extent_list;
- struct slow_work jd_work;
+ struct work_struct jd_work;
struct inode *jd_inode;
unsigned long jd_flags;
#define JDF_RECOVERY 1
diff --git a/fs/gfs2/main.c b/fs/gfs2/main.c
index 5b31f77..f809842 100644
--- a/fs/gfs2/main.c
+++ b/fs/gfs2/main.c
@@ -15,7 +15,6 @@
#include <linux/init.h>
#include <linux/gfs2_ondisk.h>
#include <asm/atomic.h>
-#include <linux/slow-work.h>

#include "gfs2.h"
#include "incore.h"
@@ -114,18 +113,12 @@ static int __init init_gfs2_fs(void)
if (error)
goto fail_unregister;

- error = slow_work_register_user(THIS_MODULE);
- if (error)
- goto fail_slow;
-
gfs2_register_debugfs();

printk("GFS2 (built %s %s) installed\n", __DATE__, __TIME__);

return 0;

-fail_slow:
- unregister_filesystem(&gfs2meta_fs_type);
fail_unregister:
unregister_filesystem(&gfs2_fs_type);
fail:
@@ -163,7 +156,7 @@ static void __exit exit_gfs2_fs(void)
gfs2_unregister_debugfs();
unregister_filesystem(&gfs2_fs_type);
unregister_filesystem(&gfs2meta_fs_type);
- slow_work_unregister_user(THIS_MODULE);
+ flush_workqueue(system_long_wq);

kmem_cache_destroy(gfs2_quotad_cachep);
kmem_cache_destroy(gfs2_rgrpd_cachep);
diff --git a/fs/gfs2/ops_fstype.c b/fs/gfs2/ops_fstype.c
index edfee24..58e130e 100644
--- a/fs/gfs2/ops_fstype.c
+++ b/fs/gfs2/ops_fstype.c
@@ -17,7 +17,6 @@
#include <linux/namei.h>
#include <linux/mount.h>
#include <linux/gfs2_ondisk.h>
-#include <linux/slow-work.h>
#include <linux/quotaops.h>

#include "gfs2.h"
@@ -673,7 +672,7 @@ static int gfs2_jindex_hold(struct gfs2_sbd *sdp, struct gfs2_holder *ji_gh)
break;

INIT_LIST_HEAD(&jd->extent_list);
- slow_work_init(&jd->jd_work, &gfs2_recover_ops);
+ INIT_WORK(&jd->jd_work, gfs2_recover_func);
jd->jd_inode = gfs2_lookupi(sdp->sd_jindex, &name, 1);
if (!jd->jd_inode || IS_ERR(jd->jd_inode)) {
if (!jd->jd_inode)
@@ -778,7 +777,8 @@ static int init_journal(struct gfs2_sbd *sdp, int undo)
if (sdp->sd_lockstruct.ls_first) {
unsigned int x;
for (x = 0; x < sdp->sd_journals; x++) {
- error = gfs2_recover_journal(gfs2_jdesc_find(sdp, x));
+ error = gfs2_recover_journal(gfs2_jdesc_find(sdp, x),
+ true);
if (error) {
fs_err(sdp, "error recovering journal %u: %d\n",
x, error);
@@ -788,7 +788,7 @@ static int init_journal(struct gfs2_sbd *sdp, int undo)

gfs2_others_may_mount(sdp);
} else if (!sdp->sd_args.ar_spectator) {
- error = gfs2_recover_journal(sdp->sd_jdesc);
+ error = gfs2_recover_journal(sdp->sd_jdesc, true);
if (error) {
fs_err(sdp, "error recovering my journal: %d\n", error);
goto fail_jinode_gh;
diff --git a/fs/gfs2/recovery.c b/fs/gfs2/recovery.c
index 4b9bece..906f1ee 100644
--- a/fs/gfs2/recovery.c
+++ b/fs/gfs2/recovery.c
@@ -14,7 +14,6 @@
#include <linux/buffer_head.h>
#include <linux/gfs2_ondisk.h>
#include <linux/crc32.h>
-#include <linux/slow-work.h>

#include "gfs2.h"
#include "incore.h"
@@ -443,23 +442,7 @@ static void gfs2_recovery_done(struct gfs2_sbd *sdp, unsigned int jid,
kobject_uevent_env(&sdp->sd_kobj, KOBJ_CHANGE, envp);
}

-static int gfs2_recover_get_ref(struct slow_work *work)
-{
- struct gfs2_jdesc *jd = container_of(work, struct gfs2_jdesc, jd_work);
- if (test_and_set_bit(JDF_RECOVERY, &jd->jd_flags))
- return -EBUSY;
- return 0;
-}
-
-static void gfs2_recover_put_ref(struct slow_work *work)
-{
- struct gfs2_jdesc *jd = container_of(work, struct gfs2_jdesc, jd_work);
- clear_bit(JDF_RECOVERY, &jd->jd_flags);
- smp_mb__after_clear_bit();
- wake_up_bit(&jd->jd_flags, JDF_RECOVERY);
-}
-
-static void gfs2_recover_work(struct slow_work *work)
+void gfs2_recover_func(struct work_struct *work)
{
struct gfs2_jdesc *jd = container_of(work, struct gfs2_jdesc, jd_work);
struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
@@ -578,7 +561,7 @@ static void gfs2_recover_work(struct slow_work *work)
gfs2_glock_dq_uninit(&j_gh);

fs_info(sdp, "jid=%u: Done\n", jd->jd_jid);
- return;
+ goto done;

fail_gunlock_tr:
gfs2_glock_dq_uninit(&t_gh);
@@ -590,32 +573,35 @@ fail_gunlock_j:
}

fs_info(sdp, "jid=%u: %s\n", jd->jd_jid, (error) ? "Failed" : "Done");
-
fail:
gfs2_recovery_done(sdp, jd->jd_jid, LM_RD_GAVEUP);
+done:
+ clear_bit(JDF_RECOVERY, &jd->jd_flags);
+ smp_mb__after_clear_bit();
+ wake_up_bit(&jd->jd_flags, JDF_RECOVERY);
}

-struct slow_work_ops gfs2_recover_ops = {
- .owner = THIS_MODULE,
- .get_ref = gfs2_recover_get_ref,
- .put_ref = gfs2_recover_put_ref,
- .execute = gfs2_recover_work,
-};
-
-
static int gfs2_recovery_wait(void *word)
{
schedule();
return 0;
}

-int gfs2_recover_journal(struct gfs2_jdesc *jd)
+int gfs2_recover_journal(struct gfs2_jdesc *jd, bool wait)
{
int rv;
- rv = slow_work_enqueue(&jd->jd_work);
- if (rv)
- return rv;
- wait_on_bit(&jd->jd_flags, JDF_RECOVERY, gfs2_recovery_wait, TASK_UNINTERRUPTIBLE);
+
+ if (test_and_set_bit(JDF_RECOVERY, &jd->jd_flags))
+ return -EBUSY;
+
+ /* we have JDF_RECOVERY, queue should always succeed */
+ rv = queue_work(system_long_wq, &jd->jd_work);
+ BUG_ON(!rv);
+
+ if (wait)
+ wait_on_bit(&jd->jd_flags, JDF_RECOVERY, gfs2_recovery_wait,
+ TASK_UNINTERRUPTIBLE);
+
return 0;
}

diff --git a/fs/gfs2/recovery.h b/fs/gfs2/recovery.h
index 1616ac2..78abd50 100644
--- a/fs/gfs2/recovery.h
+++ b/fs/gfs2/recovery.h
@@ -27,8 +27,8 @@ extern void gfs2_revoke_clean(struct gfs2_sbd *sdp);

extern int gfs2_find_jhead(struct gfs2_jdesc *jd,
struct gfs2_log_header_host *head);
-extern int gfs2_recover_journal(struct gfs2_jdesc *gfs2_jd);
-extern struct slow_work_ops gfs2_recover_ops;
+extern int gfs2_recover_journal(struct gfs2_jdesc *gfs2_jd, bool wait);
+extern void gfs2_recover_func(struct work_struct *work);

#endif /* __RECOVERY_DOT_H__ */

diff --git a/fs/gfs2/sys.c b/fs/gfs2/sys.c
index 0dc3462..ee0f3cd 100644
--- a/fs/gfs2/sys.c
+++ b/fs/gfs2/sys.c
@@ -26,6 +26,7 @@
#include "quota.h"
#include "util.h"
#include "glops.h"
+#include "recovery.h"

struct gfs2_attr {
struct attribute attr;
@@ -351,7 +352,7 @@ static ssize_t recover_store(struct gfs2_sbd *sdp, const char *buf, size_t len)
list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
if (jd->jd_jid != jid)
continue;
- rv = slow_work_enqueue(&jd->jd_work);
+ rv = gfs2_recover_journal(jd, false);
break;
}
out:

Tejun Heo

unread,
Jan 17, 2010, 8:00:02 PM1/17/10
to
slow-work doesn't have any user left. Kill it.

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: David Howells <dhow...@redhat.com>
---
Documentation/slow-work.txt | 322 -------------
include/linux/slow-work.h | 163 -------
init/Kconfig | 24 -
kernel/Makefile | 2 -
kernel/slow-work-debugfs.c | 227 ---------
kernel/slow-work.c | 1068 -------------------------------------------
kernel/slow-work.h | 72 ---
kernel/sysctl.c | 8 -
8 files changed, 0 insertions(+), 1886 deletions(-)
delete mode 100644 Documentation/slow-work.txt
delete mode 100644 include/linux/slow-work.h
delete mode 100644 kernel/slow-work-debugfs.c
delete mode 100644 kernel/slow-work.c
delete mode 100644 kernel/slow-work.h

diff --git a/Documentation/slow-work.txt b/Documentation/slow-work.txt
deleted file mode 100644
index 9dbf447..0000000
--- a/Documentation/slow-work.txt
+++ /dev/null
@@ -1,322 +0,0 @@
- ====================================
- SLOW WORK ITEM EXECUTION THREAD POOL
- ====================================
-
-By: David Howells <dhow...@redhat.com>
-
-The slow work item execution thread pool is a pool of threads for performing
-things that take a relatively long time, such as making mkdir calls.
-Typically, when processing something, these items will spend a lot of time
-blocking a thread on I/O, thus making that thread unavailable for doing other
-work.
-
-The standard workqueue model is unsuitable for this class of work item as that
-limits the owner to a single thread or a single thread per CPU. For some
-tasks, however, more threads - or fewer - are required.
-
-There is just one pool per system. It contains no threads unless something
-wants to use it - and that something must register its interest first. When
-the pool is active, the number of threads it contains is dynamic, varying
-between a maximum and minimum setting, depending on the load.
-
-
-====================
-CLASSES OF WORK ITEM
-====================
-
-This pool support two classes of work items:
-
- (*) Slow work items.
-
- (*) Very slow work items.
-
-The former are expected to finish much quicker than the latter.
-
-An operation of the very slow class may do a batch combination of several
-lookups, mkdirs, and a create for instance.
-
-An operation of the ordinarily slow class may, for example, write stuff or
-expand files, provided the time taken to do so isn't too long.
-
-Operations of both types may sleep during execution, thus tying up the thread
-loaned to it.
-
-A further class of work item is available, based on the slow work item class:
-
- (*) Delayed slow work items.
-
-These are slow work items that have a timer to defer queueing of the item for
-a while.
-
-
-THREAD-TO-CLASS ALLOCATION
---------------------------
-
-Not all the threads in the pool are available to work on very slow work items.
-The number will be between one and one fewer than the number of active threads.
-This is configurable (see the "Pool Configuration" section).
-
-All the threads are available to work on ordinarily slow work items, but a
-percentage of the threads will prefer to work on very slow work items.
-
-The configuration ensures that at least one thread will be available to work on
-very slow work items, and at least one thread will be available that won't work
-on very slow work items at all.
-
-
-=====================
-USING SLOW WORK ITEMS
-=====================
-
-Firstly, a module or subsystem wanting to make use of slow work items must
-register its interest:
-
- int ret = slow_work_register_user(struct module *module);
-
-This will return 0 if successful, or a -ve error upon failure. The module
-pointer should be the module interested in using this facility (almost
-certainly THIS_MODULE).
-
-
-Slow work items may then be set up by:
-
- (1) Declaring a slow_work struct type variable:
-
- #include <linux/slow-work.h>
-
- struct slow_work myitem;
-
- (2) Declaring the operations to be used for this item:
-
- struct slow_work_ops myitem_ops = {
- .get_ref = myitem_get_ref,
- .put_ref = myitem_put_ref,
- .execute = myitem_execute,
- };
-
- [*] For a description of the ops, see section "Item Operations".
-
- (3) Initialising the item:
-
- slow_work_init(&myitem, &myitem_ops);
-
- or:
-
- delayed_slow_work_init(&myitem, &myitem_ops);
-
- or:
-
- vslow_work_init(&myitem, &myitem_ops);
-
- depending on its class.
-
-A suitably set up work item can then be enqueued for processing:
-
- int ret = slow_work_enqueue(&myitem);
-
-This will return a -ve error if the thread pool is unable to gain a reference
-on the item, 0 otherwise, or (for delayed work):
-
- int ret = delayed_slow_work_enqueue(&myitem, my_jiffy_delay);
-
-
-The items are reference counted, so there ought to be no need for a flush
-operation. But as the reference counting is optional, means to cancel
-existing work items are also included:
-
- cancel_slow_work(&myitem);
- cancel_delayed_slow_work(&myitem);
-
-can be used to cancel pending work. The above cancel function waits for
-existing work to have been executed (or prevent execution of them, depending
-on timing).
-
-
-When all a module's slow work items have been processed, and the
-module has no further interest in the facility, it should unregister its
-interest:
-
- slow_work_unregister_user(struct module *module);
-
-The module pointer is used to wait for all outstanding work items for that
-module before completing the unregistration. This prevents the put_ref() code
-from being taken away before it completes. module should almost certainly be
-THIS_MODULE.
-
-
-================
-HELPER FUNCTIONS
-================
-
-The slow-work facility provides a function by which it can be determined
-whether or not an item is queued for later execution:
-
- bool queued = slow_work_is_queued(struct slow_work *work);
-
-If it returns false, then the item is not on the queue (it may be executing
-with a requeue pending). This can be used to work out whether an item on which
-another depends is on the queue, thus allowing a dependent item to be queued
-after it.
-
-If the above shows an item on which another depends not to be queued, then the
-owner of the dependent item might need to wait. However, to avoid locking up
-the threads unnecessarily be sleeping in them, it can make sense under some
-circumstances to return the work item to the queue, thus deferring it until
-some other items have had a chance to make use of the yielded thread.
-
-To yield a thread and defer an item, the work function should simply enqueue
-the work item again and return. However, this doesn't work if there's nothing
-actually on the queue, as the thread just vacated will jump straight back into
-the item's work function, thus busy waiting on a CPU.
-
-Instead, the item should use the thread to wait for the dependency to go away,
-but rather than using schedule() or schedule_timeout() to sleep, it should use
-the following function:
-
- bool requeue = slow_work_sleep_till_thread_needed(
- struct slow_work *work,
- signed long *_timeout);
-
-This will add a second wait and then sleep, such that it will be woken up if
-either something appears on the queue that could usefully make use of the
-thread - and behind which this item can be queued, or if the event the caller
-set up to wait for happens. True will be returned if something else appeared
-on the queue and this work function should perhaps return, of false if
-something else woke it up. The timeout is as for schedule_timeout().
-
-For example:
-
- wq = bit_waitqueue(&my_flags, MY_BIT);
- init_wait(&wait);
- requeue = false;
- do {
- prepare_to_wait(wq, &wait, TASK_UNINTERRUPTIBLE);
- if (!test_bit(MY_BIT, &my_flags))
- break;
- requeue = slow_work_sleep_till_thread_needed(&my_work,
- &timeout);
- } while (timeout > 0 && !requeue);
- finish_wait(wq, &wait);
- if (!test_bit(MY_BIT, &my_flags)
- goto do_my_thing;
- if (requeue)
- return; // to slow_work
-
-
-===============
-ITEM OPERATIONS
-===============
-
-Each work item requires a table of operations of type struct slow_work_ops.
-Only ->execute() is required; the getting and putting of a reference and the
-describing of an item are all optional.
-
- (*) Get a reference on an item:
-
- int (*get_ref)(struct slow_work *work);
-
- This allows the thread pool to attempt to pin an item by getting a
- reference on it. This function should return 0 if the reference was
- granted, or a -ve error otherwise. If an error is returned,
- slow_work_enqueue() will fail.
-
- The reference is held whilst the item is queued and whilst it is being
- executed. The item may then be requeued with the same reference held, or
- the reference will be released.
-
- (*) Release a reference on an item:
-
- void (*put_ref)(struct slow_work *work);
-
- This allows the thread pool to unpin an item by releasing the reference on
- it. The thread pool will not touch the item again once this has been
- called.
-
- (*) Execute an item:
-
- void (*execute)(struct slow_work *work);
-
- This should perform the work required of the item. It may sleep, it may
- perform disk I/O and it may wait for locks.
-
- (*) View an item through /proc:
-
- void (*desc)(struct slow_work *work, struct seq_file *m);
-
- If supplied, this should print to 'm' a small string describing the work
- the item is to do. This should be no more than about 40 characters, and
- shouldn't include a newline character.
-
- See the 'Viewing executing and queued items' section below.
-
-
-==================
-POOL CONFIGURATION
-==================
-
-The slow-work thread pool has a number of configurables:
-
- (*) /proc/sys/kernel/slow-work/min-threads
-
- The minimum number of threads that should be in the pool whilst it is in
- use. This may be anywhere between 2 and max-threads.
-
- (*) /proc/sys/kernel/slow-work/max-threads
-
- The maximum number of threads that should in the pool. This may be
- anywhere between min-threads and 255 or NR_CPUS * 2, whichever is greater.
-
- (*) /proc/sys/kernel/slow-work/vslow-percentage
-
- The percentage of active threads in the pool that may be used to execute
- very slow work items. This may be between 1 and 99. The resultant number
- is bounded to between 1 and one fewer than the number of active threads.
- This ensures there is always at least one thread that can process very
- slow work items, and always at least one thread that won't.
-
-
-==================================
-VIEWING EXECUTING AND QUEUED ITEMS
-==================================
-
-If CONFIG_SLOW_WORK_DEBUG is enabled, a debugfs file is made available:
-
- /sys/kernel/debug/slow_work/runqueue
-
-through which the list of work items being executed and the queues of items to
-be executed may be viewed. The owner of a work item is given the chance to
-add some information of its own.
-
-The contents look something like the following:
-
- THR PID ITEM ADDR FL MARK DESC
- === ===== ================ == ===== ==========
- 0 3005 ffff880023f52348 a 952ms FSC: OBJ17d3: LOOK
- 1 3006 ffff880024e33668 2 160ms FSC: OBJ17e5 OP60d3b: Write1/Store fl=2
- 2 3165 ffff8800296dd180 a 424ms FSC: OBJ17e4: LOOK
- 3 4089 ffff8800262c8d78 a 212ms FSC: OBJ17ea: CRTN
- 4 4090 ffff88002792bed8 2 388ms FSC: OBJ17e8 OP60d36: Write1/Store fl=2
- 5 4092 ffff88002a0ef308 2 388ms FSC: OBJ17e7 OP60d2e: Write1/Store fl=2
- 6 4094 ffff88002abaf4b8 2 132ms FSC: OBJ17e2 OP60d4e: Write1/Store fl=2
- 7 4095 ffff88002bb188e0 a 388ms FSC: OBJ17e9: CRTN
- vsq - ffff880023d99668 1 308ms FSC: OBJ17e0 OP60f91: Write1/EnQ fl=2
- vsq - ffff8800295d1740 1 212ms FSC: OBJ16be OP4d4b6: Write1/EnQ fl=2
- vsq - ffff880025ba3308 1 160ms FSC: OBJ179a OP58dec: Write1/EnQ fl=2
- vsq - ffff880024ec83e0 1 160ms FSC: OBJ17ae OP599f2: Write1/EnQ fl=2
- vsq - ffff880026618e00 1 160ms FSC: OBJ17e6 OP60d33: Write1/EnQ fl=2
- vsq - ffff880025a2a4b8 1 132ms FSC: OBJ16a2 OP4d583: Write1/EnQ fl=2
- vsq - ffff880023cbe6d8 9 212ms FSC: OBJ17eb: LOOK
- vsq - ffff880024d37590 9 212ms FSC: OBJ17ec: LOOK
- vsq - ffff880027746cb0 9 212ms FSC: OBJ17ed: LOOK
- vsq - ffff880024d37ae8 9 212ms FSC: OBJ17ee: LOOK
- vsq - ffff880024d37cb0 9 212ms FSC: OBJ17ef: LOOK
- vsq - ffff880025036550 9 212ms FSC: OBJ17f0: LOOK
- vsq - ffff8800250368e0 9 212ms FSC: OBJ17f1: LOOK
- vsq - ffff880025036aa8 9 212ms FSC: OBJ17f2: LOOK
-
-In the 'THR' column, executing items show the thread they're occupying and
-queued threads indicate which queue they're on. 'PID' shows the process ID of
-a slow-work thread that's executing something. 'FL' shows the work item flags.
-'MARK' indicates how long since an item was queued or began executing. Lastly,
-the 'DESC' column permits the owner of an item to give some information.
-
diff --git a/include/linux/slow-work.h b/include/linux/slow-work.h
deleted file mode 100644
index 13337bf..0000000
--- a/include/linux/slow-work.h
+++ /dev/null
@@ -1,163 +0,0 @@
-/* Worker thread pool for slow items, such as filesystem lookups or mkdirs
- *
- * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells (dhow...@redhat.com)
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public Licence
- * as published by the Free Software Foundation; either version
- * 2 of the Licence, or (at your option) any later version.
- *
- * See Documentation/slow-work.txt
- */
-
-#ifndef _LINUX_SLOW_WORK_H
-#define _LINUX_SLOW_WORK_H
-
-#ifdef CONFIG_SLOW_WORK
-
-#include <linux/sysctl.h>
-#include <linux/timer.h>
-
-struct slow_work;
-#ifdef CONFIG_SLOW_WORK_DEBUG
-struct seq_file;
-#endif
-
-/*
- * The operations used to support slow work items
- */
-struct slow_work_ops {
- /* owner */
- struct module *owner;
-
- /* get a ref on a work item
- * - return 0 if successful, -ve if not
- */
- int (*get_ref)(struct slow_work *work);
-
- /* discard a ref to a work item */
- void (*put_ref)(struct slow_work *work);
-
- /* execute a work item */
- void (*execute)(struct slow_work *work);
-
-#ifdef CONFIG_SLOW_WORK_DEBUG
- /* describe a work item for debugfs */
- void (*desc)(struct slow_work *work, struct seq_file *m);
-#endif
-};
-
-/*
- * A slow work item
- * - A reference is held on the parent object by the thread pool when it is
- * queued
- */
-struct slow_work {
- struct module *owner; /* the owning module */
- unsigned long flags;
-#define SLOW_WORK_PENDING 0 /* item pending (further) execution */
-#define SLOW_WORK_EXECUTING 1 /* item currently executing */
-#define SLOW_WORK_ENQ_DEFERRED 2 /* item enqueue deferred */
-#define SLOW_WORK_VERY_SLOW 3 /* item is very slow */
-#define SLOW_WORK_CANCELLING 4 /* item is being cancelled, don't enqueue */
-#define SLOW_WORK_DELAYED 5 /* item is struct delayed_slow_work with active timer */
- const struct slow_work_ops *ops; /* operations table for this item */
- struct list_head link; /* link in queue */
-#ifdef CONFIG_SLOW_WORK_DEBUG
- struct timespec mark; /* jiffies at which queued or exec begun */
-#endif
-};
-
-struct delayed_slow_work {
- struct slow_work work;
- struct timer_list timer;
-};
-
-/**
- * slow_work_init - Initialise a slow work item
- * @work: The work item to initialise
- * @ops: The operations to use to handle the slow work item
- *
- * Initialise a slow work item.
- */
-static inline void slow_work_init(struct slow_work *work,
- const struct slow_work_ops *ops)
-{
- work->flags = 0;
- work->ops = ops;
- INIT_LIST_HEAD(&work->link);
-}
-
-/**
- * slow_work_init - Initialise a delayed slow work item
- * @work: The work item to initialise
- * @ops: The operations to use to handle the slow work item
- *
- * Initialise a delayed slow work item.
- */
-static inline void delayed_slow_work_init(struct delayed_slow_work *dwork,
- const struct slow_work_ops *ops)
-{
- init_timer(&dwork->timer);
- slow_work_init(&dwork->work, ops);
-}
-
-/**
- * vslow_work_init - Initialise a very slow work item
- * @work: The work item to initialise
- * @ops: The operations to use to handle the slow work item
- *
- * Initialise a very slow work item. This item will be restricted such that
- * only a certain number of the pool threads will be able to execute items of
- * this type.
- */
-static inline void vslow_work_init(struct slow_work *work,
- const struct slow_work_ops *ops)
-{
- work->flags = 1 << SLOW_WORK_VERY_SLOW;
- work->ops = ops;
- INIT_LIST_HEAD(&work->link);
-}
-
-/**
- * slow_work_is_queued - Determine if a slow work item is on the work queue
- * work: The work item to test
- *
- * Determine if the specified slow-work item is on the work queue. This
- * returns true if it is actually on the queue.
- *
- * If the item is executing and has been marked for requeue when execution
- * finishes, then false will be returned.
- *
- * Anyone wishing to wait for completion of execution can wait on the
- * SLOW_WORK_EXECUTING bit.
- */
-static inline bool slow_work_is_queued(struct slow_work *work)
-{
- unsigned long flags = work->flags;
- return flags & SLOW_WORK_PENDING && !(flags & SLOW_WORK_EXECUTING);
-}
-
-extern int slow_work_enqueue(struct slow_work *work);
-extern void slow_work_cancel(struct slow_work *work);
-extern int slow_work_register_user(struct module *owner);
-extern void slow_work_unregister_user(struct module *owner);
-
-extern int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
- unsigned long delay);
-
-static inline void delayed_slow_work_cancel(struct delayed_slow_work *dwork)
-{
- slow_work_cancel(&dwork->work);
-}
-
-extern bool slow_work_sleep_till_thread_needed(struct slow_work *work,
- signed long *_timeout);
-
-#ifdef CONFIG_SYSCTL
-extern ctl_table slow_work_sysctls[];
-#endif
-
-#endif /* CONFIG_SLOW_WORK */
-#endif /* _LINUX_SLOW_WORK_H */
diff --git a/init/Kconfig b/init/Kconfig
index b1b7175..6bd0c83 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -1126,30 +1126,6 @@ config TRACEPOINTS

source "arch/Kconfig"

-config SLOW_WORK
- default n
- bool
- help
- The slow work thread pool provides a number of dynamically allocated
- threads that can be used by the kernel to perform operations that
- take a relatively long time.
-
- An example of this would be CacheFiles doing a path lookup followed
- by a series of mkdirs and a create call, all of which have to touch
- disk.
-
- See Documentation/slow-work.txt.
-
-config SLOW_WORK_DEBUG
- bool "Slow work debugging through debugfs"
- default n
- depends on SLOW_WORK && DEBUG_FS
- help
- Display the contents of the slow work run queue through debugfs,
- including items currently executing.
-
- See Documentation/slow-work.txt.
-
endmenu # General setup

config HAVE_GENERIC_DMA_COHERENT
diff --git a/kernel/Makefile b/kernel/Makefile
index 864ff75..99ce6f2 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -95,8 +95,6 @@ obj-$(CONFIG_TRACING) += trace/
obj-$(CONFIG_X86_DS) += trace/
obj-$(CONFIG_RING_BUFFER) += trace/
obj-$(CONFIG_SMP) += sched_cpupri.o
-obj-$(CONFIG_SLOW_WORK) += slow-work.o
-obj-$(CONFIG_SLOW_WORK_DEBUG) += slow-work-debugfs.o
obj-$(CONFIG_PERF_EVENTS) += perf_event.o
obj-$(CONFIG_HAVE_HW_BREAKPOINT) += hw_breakpoint.o
obj-$(CONFIG_USER_RETURN_NOTIFIER) += user-return-notifier.o
diff --git a/kernel/slow-work-debugfs.c b/kernel/slow-work-debugfs.c
deleted file mode 100644
index e45c436..0000000
--- a/kernel/slow-work-debugfs.c
+++ /dev/null
@@ -1,227 +0,0 @@
-/* Slow work debugging
- *
- * Copyright (C) 2009 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells (dhow...@redhat.com)
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public Licence
- * as published by the Free Software Foundation; either version
- * 2 of the Licence, or (at your option) any later version.
- */
-
-#include <linux/module.h>
-#include <linux/slow-work.h>
-#include <linux/fs.h>
-#include <linux/time.h>
-#include <linux/seq_file.h>
-#include "slow-work.h"
-
-#define ITERATOR_SHIFT (BITS_PER_LONG - 4)
-#define ITERATOR_SELECTOR (0xfUL << ITERATOR_SHIFT)
-#define ITERATOR_COUNTER (~ITERATOR_SELECTOR)
-
-void slow_work_new_thread_desc(struct slow_work *work, struct seq_file *m)
-{
- seq_puts(m, "Slow-work: New thread");
-}
-
-/*
- * Render the time mark field on a work item into a 5-char time with units plus
- * a space
- */
-static void slow_work_print_mark(struct seq_file *m, struct slow_work *work)
-{
- struct timespec now, diff;
-
- now = CURRENT_TIME;
- diff = timespec_sub(now, work->mark);
-
- if (diff.tv_sec < 0)
- seq_puts(m, " -ve ");
- else if (diff.tv_sec == 0 && diff.tv_nsec < 1000)
- seq_printf(m, "%3luns ", diff.tv_nsec);
- else if (diff.tv_sec == 0 && diff.tv_nsec < 1000000)
- seq_printf(m, "%3luus ", diff.tv_nsec / 1000);
- else if (diff.tv_sec == 0 && diff.tv_nsec < 1000000000)
- seq_printf(m, "%3lums ", diff.tv_nsec / 1000000);
- else if (diff.tv_sec <= 1)
- seq_puts(m, " 1s ");
- else if (diff.tv_sec < 60)
- seq_printf(m, "%4lus ", diff.tv_sec);
- else if (diff.tv_sec < 60 * 60)
- seq_printf(m, "%4lum ", diff.tv_sec / 60);
- else if (diff.tv_sec < 60 * 60 * 24)
- seq_printf(m, "%4luh ", diff.tv_sec / 3600);
- else
- seq_puts(m, "exces ");
-}
-
-/*
- * Describe a slow work item for debugfs
- */
-static int slow_work_runqueue_show(struct seq_file *m, void *v)
-{
- struct slow_work *work;
- struct list_head *p = v;
- unsigned long id;
-
- switch ((unsigned long) v) {
- case 1:
- seq_puts(m, "THR PID ITEM ADDR FL MARK DESC\n");
- return 0;
- case 2:
- seq_puts(m, "=== ===== ================ == ===== ==========\n");
- return 0;
-
- case 3 ... 3 + SLOW_WORK_THREAD_LIMIT - 1:
- id = (unsigned long) v - 3;
-
- read_lock(&slow_work_execs_lock);
- work = slow_work_execs[id];
- if (work) {
- smp_read_barrier_depends();
-
- seq_printf(m, "%3lu %5d %16p %2lx ",
- id, slow_work_pids[id], work, work->flags);
- slow_work_print_mark(m, work);
-
- if (work->ops->desc)
- work->ops->desc(work, m);
- seq_putc(m, '\n');
- }
- read_unlock(&slow_work_execs_lock);
- return 0;
-
- default:
- work = list_entry(p, struct slow_work, link);
- seq_printf(m, "%3s - %16p %2lx ",
- work->flags & SLOW_WORK_VERY_SLOW ? "vsq" : "sq",
- work, work->flags);
- slow_work_print_mark(m, work);
-
- if (work->ops->desc)
- work->ops->desc(work, m);
- seq_putc(m, '\n');


- return 0;
- }
-}
-

-/*
- * map the iterator to a work item
- */
-static void *slow_work_runqueue_index(struct seq_file *m, loff_t *_pos)
-{
- struct list_head *p;
- unsigned long count, id;
-
- switch (*_pos >> ITERATOR_SHIFT) {
- case 0x0:
- if (*_pos == 0)
- *_pos = 1;
- if (*_pos < 3)
- return (void *)(unsigned long) *_pos;
- if (*_pos < 3 + SLOW_WORK_THREAD_LIMIT)
- for (id = *_pos - 3;
- id < SLOW_WORK_THREAD_LIMIT;
- id++, (*_pos)++)
- if (slow_work_execs[id])
- return (void *)(unsigned long) *_pos;
- *_pos = 0x1UL << ITERATOR_SHIFT;
-
- case 0x1:
- count = *_pos & ITERATOR_COUNTER;
- list_for_each(p, &slow_work_queue) {
- if (count == 0)
- return p;
- count--;
- }
- *_pos = 0x2UL << ITERATOR_SHIFT;
-
- case 0x2:
- count = *_pos & ITERATOR_COUNTER;
- list_for_each(p, &vslow_work_queue) {
- if (count == 0)
- return p;
- count--;
- }
- *_pos = 0x3UL << ITERATOR_SHIFT;
-
- default:
- return NULL;
- }
-}
-
-/*
- * set up the iterator to start reading from the first line
- */
-static void *slow_work_runqueue_start(struct seq_file *m, loff_t *_pos)
-{
- spin_lock_irq(&slow_work_queue_lock);
- return slow_work_runqueue_index(m, _pos);
-}
-
-/*
- * move to the next line
- */
-static void *slow_work_runqueue_next(struct seq_file *m, void *v, loff_t *_pos)
-{
- struct list_head *p = v;
- unsigned long selector = *_pos >> ITERATOR_SHIFT;
-
- (*_pos)++;
- switch (selector) {
- case 0x0:
- return slow_work_runqueue_index(m, _pos);
-
- case 0x1:
- if (*_pos >> ITERATOR_SHIFT == 0x1) {
- p = p->next;
- if (p != &slow_work_queue)
- return p;
- }
- *_pos = 0x2UL << ITERATOR_SHIFT;
- p = &vslow_work_queue;
-
- case 0x2:
- if (*_pos >> ITERATOR_SHIFT == 0x2) {
- p = p->next;
- if (p != &vslow_work_queue)
- return p;
- }
- *_pos = 0x3UL << ITERATOR_SHIFT;
-
- default:
- return NULL;
- }
-}
-
-/*
- * clean up after reading
- */
-static void slow_work_runqueue_stop(struct seq_file *m, void *v)
-{
- spin_unlock_irq(&slow_work_queue_lock);
-}
-
-static const struct seq_operations slow_work_runqueue_ops = {
- .start = slow_work_runqueue_start,
- .stop = slow_work_runqueue_stop,
- .next = slow_work_runqueue_next,
- .show = slow_work_runqueue_show,
-};
-
-/*
- * open "/sys/kernel/debug/slow_work/runqueue" to list queue contents
- */
-static int slow_work_runqueue_open(struct inode *inode, struct file *file)
-{
- return seq_open(file, &slow_work_runqueue_ops);
-}
-
-const struct file_operations slow_work_runqueue_fops = {
- .owner = THIS_MODULE,
- .open = slow_work_runqueue_open,
- .read = seq_read,
- .llseek = seq_lseek,
- .release = seq_release,
-};
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
deleted file mode 100644
index 7494bbf..0000000
--- a/kernel/slow-work.c
+++ /dev/null
@@ -1,1068 +0,0 @@
-/* Worker thread pool for slow items, such as filesystem lookups or mkdirs
- *
- * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells (dhow...@redhat.com)
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public Licence
- * as published by the Free Software Foundation; either version
- * 2 of the Licence, or (at your option) any later version.
- *
- * See Documentation/slow-work.txt
- */
-
-#include <linux/module.h>
-#include <linux/slow-work.h>
-#include <linux/kthread.h>
-#include <linux/freezer.h>
-#include <linux/wait.h>
-#include <linux/debugfs.h>
-#include "slow-work.h"
-
-static void slow_work_cull_timeout(unsigned long);
-static void slow_work_oom_timeout(unsigned long);
-
-#ifdef CONFIG_SYSCTL
-static int slow_work_min_threads_sysctl(struct ctl_table *, int,
- void __user *, size_t *, loff_t *);
-
-static int slow_work_max_threads_sysctl(struct ctl_table *, int ,
- void __user *, size_t *, loff_t *);
-#endif
-
-/*
- * The pool of threads has at least min threads in it as long as someone is
- * using the facility, and may have as many as max.
- *
- * A portion of the pool may be processing very slow operations.
- */
-static unsigned slow_work_min_threads = 2;
-static unsigned slow_work_max_threads = 4;
-static unsigned vslow_work_proportion = 50; /* % of threads that may process
- * very slow work */
-
-#ifdef CONFIG_SYSCTL
-static const int slow_work_min_min_threads = 2;
-static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
-static const int slow_work_min_vslow = 1;
-static const int slow_work_max_vslow = 99;
-
-ctl_table slow_work_sysctls[] = {
- {
- .procname = "min-threads",
- .data = &slow_work_min_threads,
- .maxlen = sizeof(unsigned),
- .mode = 0644,
- .proc_handler = slow_work_min_threads_sysctl,
- .extra1 = (void *) &slow_work_min_min_threads,
- .extra2 = &slow_work_max_threads,
- },
- {
- .procname = "max-threads",
- .data = &slow_work_max_threads,
- .maxlen = sizeof(unsigned),
- .mode = 0644,
- .proc_handler = slow_work_max_threads_sysctl,
- .extra1 = &slow_work_min_threads,
- .extra2 = (void *) &slow_work_max_max_threads,
- },
- {
- .procname = "vslow-percentage",
- .data = &vslow_work_proportion,
- .maxlen = sizeof(unsigned),
- .mode = 0644,
- .proc_handler = proc_dointvec_minmax,
- .extra1 = (void *) &slow_work_min_vslow,
- .extra2 = (void *) &slow_work_max_vslow,
- },
- {}
-};
-#endif
-
-/*
- * The active state of the thread pool
- */
-static atomic_t slow_work_thread_count;
-static atomic_t vslow_work_executing_count;
-
-static bool slow_work_may_not_start_new_thread;
-static bool slow_work_cull; /* cull a thread due to lack of activity */
-static DEFINE_TIMER(slow_work_cull_timer, slow_work_cull_timeout, 0, 0);
-static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
-static struct slow_work slow_work_new_thread; /* new thread starter */
-
-/*
- * slow work ID allocation (use slow_work_queue_lock)
- */
-static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
-
-/*
- * Unregistration tracking to prevent put_ref() from disappearing during module
- * unload
- */
-#ifdef CONFIG_MODULES
-static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
-static struct module *slow_work_unreg_module;
-static struct slow_work *slow_work_unreg_work_item;
-static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
-static DEFINE_MUTEX(slow_work_unreg_sync_lock);
-
-static void slow_work_set_thread_processing(int id, struct slow_work *work)
-{
- if (work)
- slow_work_thread_processing[id] = work->owner;
-}
-static void slow_work_done_thread_processing(int id, struct slow_work *work)
-{
- struct module *module = slow_work_thread_processing[id];
-
- slow_work_thread_processing[id] = NULL;
- smp_mb();
- if (slow_work_unreg_work_item == work ||
- slow_work_unreg_module == module)
- wake_up_all(&slow_work_unreg_wq);
-}
-static void slow_work_clear_thread_processing(int id)
-{
- slow_work_thread_processing[id] = NULL;
-}
-#else
-static void slow_work_set_thread_processing(int id, struct slow_work *work) {}
-static void slow_work_done_thread_processing(int id, struct slow_work *work) {}
-static void slow_work_clear_thread_processing(int id) {}
-#endif
-
-/*
- * Data for tracking currently executing items for indication through /proc
- */
-#ifdef CONFIG_SLOW_WORK_DEBUG
-struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
-pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
-DEFINE_RWLOCK(slow_work_execs_lock);
-#endif
-
-/*
- * The queues of work items and the lock governing access to them. These are
- * shared between all the CPUs. It doesn't make sense to have per-CPU queues
- * as the number of threads bears no relation to the number of CPUs.
- *
- * There are two queues of work items: one for slow work items, and one for
- * very slow work items.
- */
-LIST_HEAD(slow_work_queue);
-LIST_HEAD(vslow_work_queue);
-DEFINE_SPINLOCK(slow_work_queue_lock);
-
-/*
- * The following are two wait queues that get pinged when a work item is placed
- * on an empty queue. These allow work items that are hogging a thread by
- * sleeping in a way that could be deferred to yield their thread and enqueue
- * themselves.
- */
-static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
-static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
-
-/*
- * The thread controls. A variable used to signal to the threads that they
- * should exit when the queue is empty, a waitqueue used by the threads to wait
- * for signals, and a completion set by the last thread to exit.
- */
-static bool slow_work_threads_should_exit;
-static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq);
-static DECLARE_COMPLETION(slow_work_last_thread_exited);
-
-/*
- * The number of users of the thread pool and its lock. Whilst this is zero we
- * have no threads hanging around, and when this reaches zero, we wait for all
- * active or queued work items to complete and kill all the threads we do have.
- */
-static int slow_work_user_count;
-static DEFINE_MUTEX(slow_work_user_lock);
-
-static inline int slow_work_get_ref(struct slow_work *work)
-{
- if (work->ops->get_ref)
- return work->ops->get_ref(work);
-


- return 0;
-}
-

-static inline void slow_work_put_ref(struct slow_work *work)
-{
- if (work->ops->put_ref)
- work->ops->put_ref(work);
-}
-
-/*
- * Calculate the maximum number of active threads in the pool that are
- * permitted to process very slow work items.
- *
- * The answer is rounded up to at least 1, but may not equal or exceed the
- * maximum number of the threads in the pool. This means we always have at
- * least one thread that can process slow work items, and we always have at
- * least one thread that won't get tied up doing so.
- */
-static unsigned slow_work_calc_vsmax(void)
-{
- unsigned vsmax;
-
- vsmax = atomic_read(&slow_work_thread_count) * vslow_work_proportion;
- vsmax /= 100;
- vsmax = max(vsmax, 1U);
- return min(vsmax, slow_work_max_threads - 1);
-}
-
-/*
- * Attempt to execute stuff queued on a slow thread. Return true if we managed
- * it, false if there was nothing to do.
- */
-static noinline bool slow_work_execute(int id)
-{
- struct slow_work *work = NULL;
- unsigned vsmax;
- bool very_slow;
-
- vsmax = slow_work_calc_vsmax();
-
- /* see if we can schedule a new thread to be started if we're not
- * keeping up with the work */
- if (!waitqueue_active(&slow_work_thread_wq) &&
- (!list_empty(&slow_work_queue) || !list_empty(&vslow_work_queue)) &&
- atomic_read(&slow_work_thread_count) < slow_work_max_threads &&
- !slow_work_may_not_start_new_thread)
- slow_work_enqueue(&slow_work_new_thread);
-
- /* find something to execute */
- spin_lock_irq(&slow_work_queue_lock);
- if (!list_empty(&vslow_work_queue) &&
- atomic_read(&vslow_work_executing_count) < vsmax) {
- work = list_entry(vslow_work_queue.next,
- struct slow_work, link);
- if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
- BUG();
- list_del_init(&work->link);
- atomic_inc(&vslow_work_executing_count);
- very_slow = true;
- } else if (!list_empty(&slow_work_queue)) {
- work = list_entry(slow_work_queue.next,
- struct slow_work, link);
- if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
- BUG();
- list_del_init(&work->link);
- very_slow = false;
- } else {
- very_slow = false; /* avoid the compiler warning */
- }
-
- slow_work_set_thread_processing(id, work);
- if (work) {
- slow_work_mark_time(work);
- slow_work_begin_exec(id, work);
- }
-
- spin_unlock_irq(&slow_work_queue_lock);
-
- if (!work)
- return false;
-
- if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
- BUG();
-
- /* don't execute if the work is in the process of being cancelled */
- if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
- work->ops->execute(work);
-
- if (very_slow)
- atomic_dec(&vslow_work_executing_count);
- clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
-
- /* wake up anyone waiting for this work to be complete */
- wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
-
- slow_work_end_exec(id, work);
-
- /* if someone tried to enqueue the item whilst we were executing it,
- * then it'll be left unenqueued to avoid multiple threads trying to
- * execute it simultaneously
- *
- * there is, however, a race between us testing the pending flag and
- * getting the spinlock, and between the enqueuer setting the pending
- * flag and getting the spinlock, so we use a deferral bit to tell us
- * if the enqueuer got there first
- */
- if (test_bit(SLOW_WORK_PENDING, &work->flags)) {
- spin_lock_irq(&slow_work_queue_lock);
-
- if (!test_bit(SLOW_WORK_EXECUTING, &work->flags) &&
- test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags))
- goto auto_requeue;
-
- spin_unlock_irq(&slow_work_queue_lock);
- }
-
- /* sort out the race between module unloading and put_ref() */
- slow_work_put_ref(work);
- slow_work_done_thread_processing(id, work);
-
- return true;
-
-auto_requeue:
- /* we must complete the enqueue operation
- * - we transfer our ref on the item back to the appropriate queue
- * - don't wake another thread up as we're awake already
- */
- slow_work_mark_time(work);
- if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
- list_add_tail(&work->link, &vslow_work_queue);
- else
- list_add_tail(&work->link, &slow_work_queue);
- spin_unlock_irq(&slow_work_queue_lock);
- slow_work_clear_thread_processing(id);
- return true;
-}
-
-/**
- * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
- * work: The work item under execution that wants to sleep
- * _timeout: Scheduler sleep timeout
- *
- * Allow a requeueable work item to sleep on a slow-work processor thread until
- * that thread is needed to do some other work or the sleep is interrupted by
- * some other event.
- *
- * The caller must set up a wake up event before calling this and must have set
- * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
- * condition before calling this function as no test is made here.
- *
- * False is returned if there is nothing on the queue; true is returned if the
- * work item should be requeued
- */
-bool slow_work_sleep_till_thread_needed(struct slow_work *work,
- signed long *_timeout)
-{
- wait_queue_head_t *wfo_wq;
- struct list_head *queue;
-
- DEFINE_WAIT(wait);
-
- if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
- wfo_wq = &vslow_work_queue_waits_for_occupation;
- queue = &vslow_work_queue;
- } else {
- wfo_wq = &slow_work_queue_waits_for_occupation;
- queue = &slow_work_queue;
- }
-
- if (!list_empty(queue))
- return true;
-
- add_wait_queue_exclusive(wfo_wq, &wait);
- if (list_empty(queue))
- *_timeout = schedule_timeout(*_timeout);
- finish_wait(wfo_wq, &wait);
-
- return !list_empty(queue);
-}
-EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
-
-/**
- * slow_work_enqueue - Schedule a slow work item for processing
- * @work: The work item to queue
- *
- * Schedule a slow work item for processing. If the item is already undergoing
- * execution, this guarantees not to re-enter the execution routine until the
- * first execution finishes.
- *
- * The item is pinned by this function as it retains a reference to it, managed
- * through the item operations. The item is unpinned once it has been
- * executed.
- *
- * An item may hog the thread that is running it for a relatively large amount
- * of time, sufficient, for example, to perform several lookup, mkdir, create
- * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
- *
- * Conversely, if a number of items are awaiting processing, it may take some
- * time before any given item is given attention. The number of threads in the
- * pool may be increased to deal with demand, but only up to a limit.
- *
- * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
- * the very slow queue, from which only a portion of the threads will be
- * allowed to pick items to execute. This ensures that very slow items won't
- * overly block ones that are just ordinarily slow.
- *
- * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
- * attempted queued)
- */
-int slow_work_enqueue(struct slow_work *work)
-{
- wait_queue_head_t *wfo_wq;
- struct list_head *queue;
- unsigned long flags;
- int ret;
-
- if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
- return -ECANCELED;
-
- BUG_ON(slow_work_user_count <= 0);
- BUG_ON(!work);
- BUG_ON(!work->ops);
-
- /* when honouring an enqueue request, we only promise that we will run
- * the work function in the future; we do not promise to run it once
- * per enqueue request
- *
- * we use the PENDING bit to merge together repeat requests without
- * having to disable IRQs and take the spinlock, whilst still
- * maintaining our promise
- */
- if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
- if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
- wfo_wq = &vslow_work_queue_waits_for_occupation;
- queue = &vslow_work_queue;
- } else {
- wfo_wq = &slow_work_queue_waits_for_occupation;
- queue = &slow_work_queue;
- }
-
- spin_lock_irqsave(&slow_work_queue_lock, flags);
-
- if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
- goto cancelled;
-
- /* we promise that we will not attempt to execute the work
- * function in more than one thread simultaneously
- *
- * this, however, leaves us with a problem if we're asked to
- * enqueue the work whilst someone is executing the work
- * function as simply queueing the work immediately means that
- * another thread may try executing it whilst it is already
- * under execution
- *
- * to deal with this, we set the ENQ_DEFERRED bit instead of
- * enqueueing, and the thread currently executing the work
- * function will enqueue the work item when the work function
- * returns and it has cleared the EXECUTING bit
- */
- if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
- set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
- } else {
- ret = slow_work_get_ref(work);
- if (ret < 0)
- goto failed;
- slow_work_mark_time(work);
- list_add_tail(&work->link, queue);
- wake_up(&slow_work_thread_wq);
-
- /* if someone who could be requeued is sleeping on a
- * thread, then ask them to yield their thread */
- if (work->link.prev == queue)
- wake_up(wfo_wq);
- }
-
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- }
- return 0;
-
-cancelled:
- ret = -ECANCELED;
-failed:
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- return ret;
-}
-EXPORT_SYMBOL(slow_work_enqueue);
-
-static int slow_work_wait(void *word)
-{
- schedule();


- return 0;
-}
-

-/**
- * slow_work_cancel - Cancel a slow work item
- * @work: The work item to cancel
- *
- * This function will cancel a previously enqueued work item. If we cannot
- * cancel the work item, it is guarenteed to have run when this function
- * returns.
- */
-void slow_work_cancel(struct slow_work *work)
-{
- bool wait = true, put = false;
-
- set_bit(SLOW_WORK_CANCELLING, &work->flags);
- smp_mb();
-
- /* if the work item is a delayed work item with an active timer, we
- * need to wait for the timer to finish _before_ getting the spinlock,
- * lest we deadlock against the timer routine
- *
- * the timer routine will leave DELAYED set if it notices the
- * CANCELLING flag in time
- */
- if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
- struct delayed_slow_work *dwork =
- container_of(work, struct delayed_slow_work, work);
- del_timer_sync(&dwork->timer);
- }
-
- spin_lock_irq(&slow_work_queue_lock);
-
- if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
- /* the timer routine aborted or never happened, so we are left
- * holding the timer's reference on the item and should just
- * drop the pending flag and wait for any ongoing execution to
- * finish */
- struct delayed_slow_work *dwork =
- container_of(work, struct delayed_slow_work, work);
-
- BUG_ON(timer_pending(&dwork->timer));
- BUG_ON(!list_empty(&work->link));
-
- clear_bit(SLOW_WORK_DELAYED, &work->flags);
- put = true;
- clear_bit(SLOW_WORK_PENDING, &work->flags);
-
- } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
- !list_empty(&work->link)) {
- /* the link in the pending queue holds a reference on the item
- * that we will need to release */
- list_del_init(&work->link);
- wait = false;
- put = true;
- clear_bit(SLOW_WORK_PENDING, &work->flags);
-
- } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
- /* the executor is holding our only reference on the item, so
- * we merely need to wait for it to finish executing */
- clear_bit(SLOW_WORK_PENDING, &work->flags);
- }
-
- spin_unlock_irq(&slow_work_queue_lock);
-
- /* the EXECUTING flag is set by the executor whilst the spinlock is set
- * and before the item is dequeued - so assuming the above doesn't
- * actually dequeue it, simply waiting for the EXECUTING flag to be
- * released here should be sufficient */
- if (wait)
- wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
- TASK_UNINTERRUPTIBLE);
-
- clear_bit(SLOW_WORK_CANCELLING, &work->flags);
- if (put)
- slow_work_put_ref(work);
-}
-EXPORT_SYMBOL(slow_work_cancel);
-
-/*
- * Handle expiry of the delay timer, indicating that a delayed slow work item
- * should now be queued if not cancelled
- */
-static void delayed_slow_work_timer(unsigned long data)
-{
- wait_queue_head_t *wfo_wq;
- struct list_head *queue;
- struct slow_work *work = (struct slow_work *) data;
- unsigned long flags;
- bool queued = false, put = false, first = false;
-
- if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
- wfo_wq = &vslow_work_queue_waits_for_occupation;
- queue = &vslow_work_queue;
- } else {
- wfo_wq = &slow_work_queue_waits_for_occupation;
- queue = &slow_work_queue;
- }
-
- spin_lock_irqsave(&slow_work_queue_lock, flags);
- if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
- clear_bit(SLOW_WORK_DELAYED, &work->flags);
-
- if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
- /* we discard the reference the timer was holding in
- * favour of the one the executor holds */
- set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
- put = true;
- } else {
- slow_work_mark_time(work);
- list_add_tail(&work->link, queue);
- queued = true;
- if (work->link.prev == queue)
- first = true;
- }
- }
-
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- if (put)
- slow_work_put_ref(work);
- if (first)
- wake_up(wfo_wq);
- if (queued)
- wake_up(&slow_work_thread_wq);
-}
-
-/**
- * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
- * @dwork: The delayed work item to queue
- * @delay: When to start executing the work, in jiffies from now
- *
- * This is similar to slow_work_enqueue(), but it adds a delay before the work
- * is actually queued for processing.
- *
- * The item can have delayed processing requested on it whilst it is being
- * executed. The delay will begin immediately, and if it expires before the
- * item finishes executing, the item will be placed back on the queue when it
- * has done executing.
- */
-int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
- unsigned long delay)
-{
- struct slow_work *work = &dwork->work;
- unsigned long flags;
- int ret;
-
- if (delay == 0)
- return slow_work_enqueue(&dwork->work);
-
- BUG_ON(slow_work_user_count <= 0);
- BUG_ON(!work);
- BUG_ON(!work->ops);
-
- if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
- return -ECANCELED;
-
- if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
- spin_lock_irqsave(&slow_work_queue_lock, flags);
-
- if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
- goto cancelled;
-
- /* the timer holds a reference whilst it is pending */
- ret = work->ops->get_ref(work);
- if (ret < 0)
- goto cant_get_ref;
-
- if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
- BUG();
- dwork->timer.expires = jiffies + delay;
- dwork->timer.data = (unsigned long) work;
- dwork->timer.function = delayed_slow_work_timer;
- add_timer(&dwork->timer);
-
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- }
-
- return 0;
-
-cancelled:
- ret = -ECANCELED;
-cant_get_ref:
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- return ret;
-}
-EXPORT_SYMBOL(delayed_slow_work_enqueue);
-
-/*
- * Schedule a cull of the thread pool at some time in the near future
- */
-static void slow_work_schedule_cull(void)
-{
- mod_timer(&slow_work_cull_timer,
- round_jiffies(jiffies + SLOW_WORK_CULL_TIMEOUT));
-}
-
-/*
- * Worker thread culling algorithm
- */
-static bool slow_work_cull_thread(void)
-{
- unsigned long flags;
- bool do_cull = false;
-
- spin_lock_irqsave(&slow_work_queue_lock, flags);
-
- if (slow_work_cull) {
- slow_work_cull = false;
-
- if (list_empty(&slow_work_queue) &&
- list_empty(&vslow_work_queue) &&
- atomic_read(&slow_work_thread_count) >
- slow_work_min_threads) {
- slow_work_schedule_cull();
- do_cull = true;
- }
- }
-
- spin_unlock_irqrestore(&slow_work_queue_lock, flags);
- return do_cull;
-}
-
-/*
- * Determine if there is slow work available for dispatch
- */
-static inline bool slow_work_available(int vsmax)
-{
- return !list_empty(&slow_work_queue) ||
- (!list_empty(&vslow_work_queue) &&
- atomic_read(&vslow_work_executing_count) < vsmax);
-}
-
-/*
- * Worker thread dispatcher
- */
-static int slow_work_thread(void *_data)
-{
- int vsmax, id;
-
- DEFINE_WAIT(wait);
-
- set_freezable();
- set_user_nice(current, -5);
-
- /* allocate ourselves an ID */
- spin_lock_irq(&slow_work_queue_lock);
- id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
- BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
- __set_bit(id, slow_work_ids);
- slow_work_set_thread_pid(id, current->pid);
- spin_unlock_irq(&slow_work_queue_lock);
-
- sprintf(current->comm, "kslowd%03u", id);
-
- for (;;) {
- vsmax = vslow_work_proportion;
- vsmax *= atomic_read(&slow_work_thread_count);
- vsmax /= 100;
-
- prepare_to_wait_exclusive(&slow_work_thread_wq, &wait,
- TASK_INTERRUPTIBLE);
- if (!freezing(current) &&
- !slow_work_threads_should_exit &&
- !slow_work_available(vsmax) &&
- !slow_work_cull)
- schedule();
- finish_wait(&slow_work_thread_wq, &wait);
-
- try_to_freeze();
-
- vsmax = vslow_work_proportion;
- vsmax *= atomic_read(&slow_work_thread_count);
- vsmax /= 100;
-
- if (slow_work_available(vsmax) && slow_work_execute(id)) {
- cond_resched();
- if (list_empty(&slow_work_queue) &&
- list_empty(&vslow_work_queue) &&
- atomic_read(&slow_work_thread_count) >
- slow_work_min_threads)
- slow_work_schedule_cull();
- continue;
- }
-
- if (slow_work_threads_should_exit)
- break;
-
- if (slow_work_cull && slow_work_cull_thread())
- break;
- }
-
- spin_lock_irq(&slow_work_queue_lock);
- slow_work_set_thread_pid(id, 0);
- __clear_bit(id, slow_work_ids);
- spin_unlock_irq(&slow_work_queue_lock);
-
- if (atomic_dec_and_test(&slow_work_thread_count))
- complete_and_exit(&slow_work_last_thread_exited, 0);


- return 0;
-}
-

-/*
- * Handle thread cull timer expiration
- */
-static void slow_work_cull_timeout(unsigned long data)
-{
- slow_work_cull = true;
- wake_up(&slow_work_thread_wq);
-}
-
-/*
- * Start a new slow work thread
- */
-static void slow_work_new_thread_execute(struct slow_work *work)
-{
- struct task_struct *p;
-
- if (slow_work_threads_should_exit)
- return;
-
- if (atomic_read(&slow_work_thread_count) >= slow_work_max_threads)
- return;
-
- if (!mutex_trylock(&slow_work_user_lock))
- return;
-
- slow_work_may_not_start_new_thread = true;
- atomic_inc(&slow_work_thread_count);
- p = kthread_run(slow_work_thread, NULL, "kslowd");
- if (IS_ERR(p)) {
- printk(KERN_DEBUG "Slow work thread pool: OOM\n");
- if (atomic_dec_and_test(&slow_work_thread_count))
- BUG(); /* we're running on a slow work thread... */
- mod_timer(&slow_work_oom_timer,
- round_jiffies(jiffies + SLOW_WORK_OOM_TIMEOUT));
- } else {
- /* ratelimit the starting of new threads */
- mod_timer(&slow_work_oom_timer, jiffies + 1);
- }
-
- mutex_unlock(&slow_work_user_lock);
-}
-
-static const struct slow_work_ops slow_work_new_thread_ops = {
- .owner = THIS_MODULE,
- .execute = slow_work_new_thread_execute,
-#ifdef CONFIG_SLOW_WORK_DEBUG
- .desc = slow_work_new_thread_desc,
-#endif
-};
-
-/*
- * post-OOM new thread start suppression expiration
- */
-static void slow_work_oom_timeout(unsigned long data)
-{
- slow_work_may_not_start_new_thread = false;
-}
-
-#ifdef CONFIG_SYSCTL
-/*
- * Handle adjustment of the minimum number of threads
- */
-static int slow_work_min_threads_sysctl(struct ctl_table *table, int write,
- void __user *buffer,
- size_t *lenp, loff_t *ppos)
-{
- int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
- int n;
-
- if (ret == 0) {
- mutex_lock(&slow_work_user_lock);
- if (slow_work_user_count > 0) {
- /* see if we need to start or stop threads */
- n = atomic_read(&slow_work_thread_count) -
- slow_work_min_threads;
-
- if (n < 0 && !slow_work_may_not_start_new_thread)
- slow_work_enqueue(&slow_work_new_thread);
- else if (n > 0)
- slow_work_schedule_cull();
- }
- mutex_unlock(&slow_work_user_lock);
- }
-
- return ret;
-}
-
-/*
- * Handle adjustment of the maximum number of threads
- */
-static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
- void __user *buffer,
- size_t *lenp, loff_t *ppos)
-{
- int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
- int n;
-
- if (ret == 0) {
- mutex_lock(&slow_work_user_lock);
- if (slow_work_user_count > 0) {
- /* see if we need to stop threads */
- n = slow_work_max_threads -
- atomic_read(&slow_work_thread_count);
-
- if (n < 0)
- slow_work_schedule_cull();
- }
- mutex_unlock(&slow_work_user_lock);
- }
-
- return ret;
-}
-#endif /* CONFIG_SYSCTL */
-
-/**
- * slow_work_register_user - Register a user of the facility
- * @module: The module about to make use of the facility
- *
- * Register a user of the facility, starting up the initial threads if there
- * aren't any other users at this point. This will return 0 if successful, or
- * an error if not.
- */
-int slow_work_register_user(struct module *module)
-{
- struct task_struct *p;
- int loop;
-
- mutex_lock(&slow_work_user_lock);
-
- if (slow_work_user_count == 0) {
- printk(KERN_NOTICE "Slow work thread pool: Starting up\n");
- init_completion(&slow_work_last_thread_exited);
-
- slow_work_threads_should_exit = false;
- slow_work_init(&slow_work_new_thread,
- &slow_work_new_thread_ops);
- slow_work_may_not_start_new_thread = false;
- slow_work_cull = false;
-
- /* start the minimum number of threads */
- for (loop = 0; loop < slow_work_min_threads; loop++) {
- atomic_inc(&slow_work_thread_count);
- p = kthread_run(slow_work_thread, NULL, "kslowd");
- if (IS_ERR(p))
- goto error;
- }
- printk(KERN_NOTICE "Slow work thread pool: Ready\n");
- }
-
- slow_work_user_count++;
- mutex_unlock(&slow_work_user_lock);
- return 0;
-
-error:
- if (atomic_dec_and_test(&slow_work_thread_count))
- complete(&slow_work_last_thread_exited);
- if (loop > 0) {
- printk(KERN_ERR "Slow work thread pool:"
- " Aborting startup on ENOMEM\n");
- slow_work_threads_should_exit = true;
- wake_up_all(&slow_work_thread_wq);
- wait_for_completion(&slow_work_last_thread_exited);
- printk(KERN_ERR "Slow work thread pool: Aborted\n");
- }
- mutex_unlock(&slow_work_user_lock);
- return PTR_ERR(p);
-}
-EXPORT_SYMBOL(slow_work_register_user);
-
-/*
- * wait for all outstanding items from the calling module to complete
- * - note that more items may be queued whilst we're waiting
- */
-static void slow_work_wait_for_items(struct module *module)
-{
-#ifdef CONFIG_MODULES
- DECLARE_WAITQUEUE(myself, current);
- struct slow_work *work;
- int loop;
-
- mutex_lock(&slow_work_unreg_sync_lock);
- add_wait_queue(&slow_work_unreg_wq, &myself);
-
- for (;;) {
- spin_lock_irq(&slow_work_queue_lock);
-
- /* first of all, we wait for the last queued item in each list
- * to be processed */
- list_for_each_entry_reverse(work, &vslow_work_queue, link) {
- if (work->owner == module) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- slow_work_unreg_work_item = work;
- goto do_wait;
- }
- }
- list_for_each_entry_reverse(work, &slow_work_queue, link) {
- if (work->owner == module) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- slow_work_unreg_work_item = work;
- goto do_wait;
- }
- }
-
- /* then we wait for the items being processed to finish */
- slow_work_unreg_module = module;
- smp_mb();
- for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
- if (slow_work_thread_processing[loop] == module)
- goto do_wait;
- }
- spin_unlock_irq(&slow_work_queue_lock);
- break; /* okay, we're done */
-
- do_wait:
- spin_unlock_irq(&slow_work_queue_lock);
- schedule();
- slow_work_unreg_work_item = NULL;
- slow_work_unreg_module = NULL;
- }
-
- remove_wait_queue(&slow_work_unreg_wq, &myself);
- mutex_unlock(&slow_work_unreg_sync_lock);
-#endif /* CONFIG_MODULES */
-}
-
-/**
- * slow_work_unregister_user - Unregister a user of the facility
- * @module: The module whose items should be cleared
- *
- * Unregister a user of the facility, killing all the threads if this was the
- * last one.
- *
- * This waits for all the work items belonging to the nominated module to go
- * away before proceeding.
- */
-void slow_work_unregister_user(struct module *module)
-{
- /* first of all, wait for all outstanding items from the calling module
- * to complete */
- if (module)
- slow_work_wait_for_items(module);
-
- /* then we can actually go about shutting down the facility if need
- * be */
- mutex_lock(&slow_work_user_lock);
-
- BUG_ON(slow_work_user_count <= 0);
-
- slow_work_user_count--;
- if (slow_work_user_count == 0) {
- printk(KERN_NOTICE "Slow work thread pool: Shutting down\n");
- slow_work_threads_should_exit = true;
- del_timer_sync(&slow_work_cull_timer);
- del_timer_sync(&slow_work_oom_timer);
- wake_up_all(&slow_work_thread_wq);
- wait_for_completion(&slow_work_last_thread_exited);
- printk(KERN_NOTICE "Slow work thread pool:"
- " Shut down complete\n");
- }
-
- mutex_unlock(&slow_work_user_lock);
-}
-EXPORT_SYMBOL(slow_work_unregister_user);
-
-/*
- * Initialise the slow work facility
- */
-static int __init init_slow_work(void)
-{
- unsigned nr_cpus = num_possible_cpus();
-
- if (slow_work_max_threads < nr_cpus)
- slow_work_max_threads = nr_cpus;
-#ifdef CONFIG_SYSCTL
- if (slow_work_max_max_threads < nr_cpus * 2)
- slow_work_max_max_threads = nr_cpus * 2;
-#endif
-#ifdef CONFIG_SLOW_WORK_DEBUG
- {
- struct dentry *dbdir;
-
- dbdir = debugfs_create_dir("slow_work", NULL);
- if (dbdir && !IS_ERR(dbdir))
- debugfs_create_file("runqueue", S_IFREG | 0400, dbdir,
- NULL, &slow_work_runqueue_fops);
- }
-#endif


- return 0;
-}
-

-subsys_initcall(init_slow_work);
diff --git a/kernel/slow-work.h b/kernel/slow-work.h
deleted file mode 100644
index 321f3c5..0000000
--- a/kernel/slow-work.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/* Slow work private definitions
- *
- * Copyright (C) 2009 Red Hat, Inc. All Rights Reserved.
- * Written by David Howells (dhow...@redhat.com)
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public Licence
- * as published by the Free Software Foundation; either version
- * 2 of the Licence, or (at your option) any later version.
- */
-
-#define SLOW_WORK_CULL_TIMEOUT (5 * HZ) /* cull threads 5s after running out of
- * things to do */
-#define SLOW_WORK_OOM_TIMEOUT (5 * HZ) /* can't start new threads for 5s after
- * OOM */
-
-#define SLOW_WORK_THREAD_LIMIT 255 /* abs maximum number of slow-work threads */
-
-/*
- * slow-work.c
- */
-#ifdef CONFIG_SLOW_WORK_DEBUG
-extern struct slow_work *slow_work_execs[];
-extern pid_t slow_work_pids[];
-extern rwlock_t slow_work_execs_lock;
-#endif
-
-extern struct list_head slow_work_queue;
-extern struct list_head vslow_work_queue;
-extern spinlock_t slow_work_queue_lock;
-
-/*
- * slow-work-debugfs.c
- */
-#ifdef CONFIG_SLOW_WORK_DEBUG
-extern const struct file_operations slow_work_runqueue_fops;
-
-extern void slow_work_new_thread_desc(struct slow_work *, struct seq_file *);
-#endif
-
-/*
- * Helper functions
- */
-static inline void slow_work_set_thread_pid(int id, pid_t pid)
-{
-#ifdef CONFIG_SLOW_WORK_PROC
- slow_work_pids[id] = pid;
-#endif
-}
-
-static inline void slow_work_mark_time(struct slow_work *work)
-{
-#ifdef CONFIG_SLOW_WORK_PROC
- work->mark = CURRENT_TIME;
-#endif
-}
-
-static inline void slow_work_begin_exec(int id, struct slow_work *work)
-{
-#ifdef CONFIG_SLOW_WORK_PROC
- slow_work_execs[id] = work;
-#endif
-}
-
-static inline void slow_work_end_exec(int id, struct slow_work *work)
-{
-#ifdef CONFIG_SLOW_WORK_PROC
- write_lock(&slow_work_execs_lock);
- slow_work_execs[id] = NULL;
- write_unlock(&slow_work_execs_lock);
-#endif
-}
diff --git a/kernel/sysctl.c b/kernel/sysctl.c
index 8a68b24..dbba676 100644
--- a/kernel/sysctl.c
+++ b/kernel/sysctl.c
@@ -48,7 +48,6 @@
#include <linux/acpi.h>
#include <linux/reboot.h>
#include <linux/ftrace.h>
-#include <linux/slow-work.h>
#include <linux/perf_event.h>

#include <asm/uaccess.h>
@@ -888,13 +887,6 @@ static struct ctl_table kern_table[] = {
.proc_handler = proc_dointvec,
},
#endif
-#ifdef CONFIG_SLOW_WORK
- {
- .procname = "slow-work",
- .mode = 0555,
- .child = slow_work_sysctls,
- },
-#endif
#ifdef CONFIG_PERF_EVENTS
{
.procname = "perf_event_paranoid",

Tejun Heo

unread,
Jan 17, 2010, 8:00:04 PM1/17/10
to
If called after sched_class chooses a CPU which isn't in a task's
cpus_allowed mask, select_fallback_rq() can end up migrating a task
which is bound to a !active but online cpu to an active cpu. This is
dangerous because active is cleared before CPU_DOWN_PREPARE is called
and subsystems expect affinities of kthreads and other tasks to be
maintained till their CPU_DOWN_PREPARE callbacks are complete.

Consult cpu_online_mask instead.

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Peter Zijlstra <pet...@infradead.org>


Cc: Ingo Molnar <mi...@elte.hu>
---

kernel/sched.c | 5 +++--
1 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/kernel/sched.c b/kernel/sched.c
index c535cc4..09d97e3 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -2288,12 +2288,12 @@ static int select_fallback_rq(int cpu, struct task_struct *p)
const struct cpumask *nodemask = cpumask_of_node(cpu_to_node(cpu));

/* Look for allowed, online CPU in same node. */
- for_each_cpu_and(dest_cpu, nodemask, cpu_active_mask)
+ for_each_cpu_and(dest_cpu, nodemask, cpu_online_mask)
if (cpumask_test_cpu(dest_cpu, &p->cpus_allowed))
return dest_cpu;

/* Any allowed, online CPU? */
- dest_cpu = cpumask_any_and(&p->cpus_allowed, cpu_active_mask);
+ dest_cpu = cpumask_any_and(&p->cpus_allowed, cpu_online_mask);
if (dest_cpu < nr_cpu_ids)
return dest_cpu;

@@ -2302,6 +2302,7 @@ static int select_fallback_rq(int cpu, struct task_struct *p)
rcu_read_lock();
cpuset_cpus_allowed_locked(p, &p->cpus_allowed);
rcu_read_unlock();
+ /* breaking affinity, consider active mask instead */
dest_cpu = cpumask_any_and(cpu_active_mask, &p->cpus_allowed);

/*

Tejun Heo

unread,
Jan 17, 2010, 8:00:02 PM1/17/10
to
Workqueue can now handle high concurrency. Use system_single_wq
instead of slow-work.

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Steve French <sfr...@samba.org>
---
fs/cifs/Kconfig | 1 -
fs/cifs/cifsfs.c | 6 +-----
fs/cifs/cifsglob.h | 8 +++++---
fs/cifs/dir.c | 2 +-
fs/cifs/file.c | 22 +++++-----------------
fs/cifs/misc.c | 15 +++++++--------
6 files changed, 19 insertions(+), 35 deletions(-)

diff --git a/fs/cifs/Kconfig b/fs/cifs/Kconfig
index 80f3525..6994a0f 100644
--- a/fs/cifs/Kconfig
+++ b/fs/cifs/Kconfig
@@ -2,7 +2,6 @@ config CIFS
tristate "CIFS support (advanced network filesystem, SMBFS successor)"
depends on INET
select NLS
- select SLOW_WORK
help
This is the client VFS module for the Common Internet File System
(CIFS) protocol which is the successor to the Server Message Block
diff --git a/fs/cifs/cifsfs.c b/fs/cifs/cifsfs.c
index 8c6a036..461a3a7 100644
--- a/fs/cifs/cifsfs.c
+++ b/fs/cifs/cifsfs.c
@@ -1038,15 +1038,10 @@ init_cifs(void)
if (rc)
goto out_unregister_key_type;
#endif
- rc = slow_work_register_user(THIS_MODULE);
- if (rc)
- goto out_unregister_resolver_key;

return 0;

- out_unregister_resolver_key:
#ifdef CONFIG_CIFS_DFS_UPCALL
- unregister_key_type(&key_type_dns_resolver);
out_unregister_key_type:
#endif
#ifdef CONFIG_CIFS_UPCALL
@@ -1069,6 +1064,7 @@ static void __exit
exit_cifs(void)
{
cFYI(DBG2, ("exit_cifs"));
+ flush_workqueue(system_single_wq);
cifs_proc_clean();
#ifdef CONFIG_CIFS_DFS_UPCALL
cifs_dfs_release_automount_timer();
diff --git a/fs/cifs/cifsglob.h b/fs/cifs/cifsglob.h
index 4b35f7e..c645843 100644
--- a/fs/cifs/cifsglob.h
+++ b/fs/cifs/cifsglob.h
@@ -18,7 +18,7 @@
*/
#include <linux/in.h>
#include <linux/in6.h>
-#include <linux/slow-work.h>
+#include <linux/workqueue.h>
#include "cifs_fs_sb.h"
#include "cifsacl.h"
/*
@@ -356,7 +356,7 @@ struct cifsFileInfo {
atomic_t count; /* reference count */
struct mutex fh_mutex; /* prevents reopen race after dead ses*/
struct cifs_search_info srch_inf;
- struct slow_work oplock_break; /* slow_work job for oplock breaks */
+ struct work_struct oplock_break; /* work for oplock breaks */
};

/* Take a reference on the file private data */
@@ -723,4 +723,6 @@ GLOBAL_EXTERN unsigned int cifs_min_rcv; /* min size of big ntwrk buf pool */
GLOBAL_EXTERN unsigned int cifs_min_small; /* min size of small buf pool */
GLOBAL_EXTERN unsigned int cifs_max_pending; /* MAX requests at once to server*/

-extern const struct slow_work_ops cifs_oplock_break_ops;
+void cifs_oplock_break(struct work_struct *work);
+void cifs_oplock_break_get(struct cifsFileInfo *cfile);
+void cifs_oplock_break_put(struct cifsFileInfo *cfile);
diff --git a/fs/cifs/dir.c b/fs/cifs/dir.c
index 6ccf726..3c6f9b2 100644
--- a/fs/cifs/dir.c
+++ b/fs/cifs/dir.c
@@ -157,7 +157,7 @@ cifs_new_fileinfo(struct inode *newinode, __u16 fileHandle,
mutex_init(&pCifsFile->lock_mutex);
INIT_LIST_HEAD(&pCifsFile->llist);
atomic_set(&pCifsFile->count, 1);
- slow_work_init(&pCifsFile->oplock_break, &cifs_oplock_break_ops);
+ INIT_WORK(&pCifsFile->oplock_break, cifs_oplock_break);

write_lock(&GlobalSMBSeslock);
list_add(&pCifsFile->tlist, &cifs_sb->tcon->openFileList);
diff --git a/fs/cifs/file.c b/fs/cifs/file.c
index 057e1da..1c5fdf9 100644
--- a/fs/cifs/file.c
+++ b/fs/cifs/file.c
@@ -2276,8 +2276,7 @@ out:
return rc;
}

-static void
-cifs_oplock_break(struct slow_work *work)
+void cifs_oplock_break(struct work_struct *work)
{
struct cifsFileInfo *cfile = container_of(work, struct cifsFileInfo,
oplock_break);
@@ -2316,33 +2315,22 @@ cifs_oplock_break(struct slow_work *work)
LOCKING_ANDX_OPLOCK_RELEASE, false);
cFYI(1, ("Oplock release rc = %d", rc));
}
+
+ cifs_oplock_break_put(cfile);
}

-static int
-cifs_oplock_break_get(struct slow_work *work)
+void cifs_oplock_break_get(struct cifsFileInfo *cfile)
{
- struct cifsFileInfo *cfile = container_of(work, struct cifsFileInfo,
- oplock_break);
mntget(cfile->mnt);
cifsFileInfo_get(cfile);
- return 0;
}

-static void
-cifs_oplock_break_put(struct slow_work *work)
+void cifs_oplock_break_put(struct cifsFileInfo *cfile)
{
- struct cifsFileInfo *cfile = container_of(work, struct cifsFileInfo,
- oplock_break);
mntput(cfile->mnt);
cifsFileInfo_put(cfile);
}

-const struct slow_work_ops cifs_oplock_break_ops = {
- .get_ref = cifs_oplock_break_get,
- .put_ref = cifs_oplock_break_put,
- .execute = cifs_oplock_break,
-};
-
const struct address_space_operations cifs_addr_ops = {
.readpage = cifs_readpage,
.readpages = cifs_readpages,
diff --git a/fs/cifs/misc.c b/fs/cifs/misc.c
index d27d4ec..a2cf7d2 100644
--- a/fs/cifs/misc.c
+++ b/fs/cifs/misc.c
@@ -499,7 +499,6 @@ is_valid_oplock_break(struct smb_hdr *buf, struct TCP_Server_Info *srv)
struct cifsTconInfo *tcon;
struct cifsInodeInfo *pCifsInode;
struct cifsFileInfo *netfile;
- int rc;

cFYI(1, ("Checking for oplock break or dnotify response"));
if ((pSMB->hdr.Command == SMB_COM_NT_TRANSACT) &&
@@ -584,13 +583,13 @@ is_valid_oplock_break(struct smb_hdr *buf, struct TCP_Server_Info *srv)
pCifsInode->clientCanCacheAll = false;
if (pSMB->OplockLevel == 0)
pCifsInode->clientCanCacheRead = false;
- rc = slow_work_enqueue(&netfile->oplock_break);
- if (rc) {
- cERROR(1, ("failed to enqueue oplock "
- "break: %d\n", rc));
- } else {
- netfile->oplock_break_cancelled = false;
- }
+
+ cifs_oplock_break_get(netfile);
+ if (!queue_work(system_single_wq,
+ &netfile->oplock_break))
+ cifs_oplock_break_put(netfile);
+ netfile->oplock_break_cancelled = false;
+
read_unlock(&GlobalSMBSeslock);
read_unlock(&cifs_tcp_ses_lock);
return true;

Andy Walls

unread,
Jan 17, 2010, 10:00:01 PM1/17/10
to
On Mon, 2010-01-18 at 09:57 +0900, Tejun Heo wrote:
> Implement work_busy() which tests whether a work is either pending or
> running. The test isn't synchronized against workqueue operation and
> the result is only good as advisory hints or for debugging.
>
> Signed-off-by: Tejun Heo <t...@kernel.org>

Hi Tejun,

>From a driver writer's perspective, this function not useful since it is
unreliable (false positives only?) and I have no way of

"ensuring the workqueue @work was last queued on stays valid until this
function returns."

I don't quite know how to check and enfore a workqueue's continuing
validity across the function call. (Maybe you could clarify?)

As a driver writer, I'd do one of two things to reliably know when a
work is "not busy":

1. mark work objects which I submitted with an atomic_t or bit flags:

struct foo_work {
struct work_struct work;
atomic_t dispatched;
struct foo_instance *foo;
};

struct foo_instance {
...
struct foo_work work_object[5];
...
};

The irq_handler finds a work_object[] that is not dispatched, marks it
dispatched, and schedules the work. The work handler will clear the
dispatched field when it is done with the work object. A busy work
object will have dispatched set, a non-busy work will not, and
dispatched can be checked atomically.

This still can suffer from false positives for "busy", but it functions
as long as the work_object[] exists vs. the workqueue validity criterion
(which isn't clear to me). The driver has direct control of when the
work_object[] array will be valid.

Or
2. Just schedule the work object and check the return value to see if
the submission suceeded. If it did, the work was "not pending". This
method can't check for "running" of course.

Is there some specific use case where this function is very useful
despite being unreliable? I just think it's asking for abuse by someone
who would think "mostly reliable" is good enough, when it actually may
not be.

Regards,
Andy

--

Tejun Heo

unread,
Jan 18, 2010, 12:40:01 AM1/18/10
to
Hello, Andy.

On 01/18/2010 11:52 AM, Andy Walls wrote:
>>From a driver writer's perspective, this function not useful since it is
> unreliable (false positives only?) and I have no way of
>
> "ensuring the workqueue @work was last queued on stays valid until this
> function returns."
>
> I don't quite know how to check and enfore a workqueue's continuing
> validity across the function call. (Maybe you could clarify?)

I don't really think that would be possible without tinkering with
workqueue internal locking.

> 2. Just schedule the work object and check the return value to see if
> the submission suceeded. If it did, the work was "not pending". This
> method can't check for "running" of course.

For workqueue, the above combined with proper subsystem locking would
be the best way to do it, I think.

> Is there some specific use case where this function is very useful
> despite being unreliable? I just think it's asking for abuse by someone
> who would think "mostly reliable" is good enough, when it actually may
> not be.

I mostly just wanted to keep the fscache debug printout which
indicates whether a fscache object has work pending or running. It's
a debug printout so it doesn't need to be reliable. If the debug
printout can be removed, this patch can go too.

Thanks.

--
tejun

Arjan van de Ven

unread,
Jan 18, 2010, 1:10:01 AM1/18/10
to
On Mon, 18 Jan 2010 09:57:44 +0900
Tejun Heo <t...@kernel.org> wrote:

> Now that cmwq can handle high concurrency, there's no reason to
> implement separate thread pool for async. Introduce alternative
> implementation based on workqueue.
>


I'm sorry but I'm really not happy with this conversion;
it looses the very nice property of being able to execute and
synchronize between places at the end just before device registration.

I don't mind the implementation sharing thread pool with your stuff,
but I really really want to keep the cookie and synchronization
mechanism. There's a bunch of users of that pending and doing things
sequential entirely just is not going to cut it.


--
Arjan van de Ven Intel Open Source Technology Centre
For development, discussion and tips for power savings,
visit http://www.lesswatts.org

Tejun Heo

unread,
Jan 18, 2010, 3:50:02 AM1/18/10
to
On 01/18/2010 03:01 PM, Arjan van de Ven wrote:
> I'm sorry but I'm really not happy with this conversion;
> it looses the very nice property of being able to execute and
> synchronize between places at the end just before device registration.

Hmm... can you elaborate a bit?

> I don't mind the implementation sharing thread pool with your stuff,
> but I really really want to keep the cookie and synchronization
> mechanism. There's a bunch of users of that pending and doing things
> sequential entirely just is not going to cut it.

For what async is currently used for, I don't think there will be any
noticeable difference. If the proposed implementation is lacking
somewhere, we can definitely improve it although I'm not sure whether
it will end up with the cookie thing.

Thanks.

--
tejun

Steven Whitehouse

unread,
Jan 18, 2010, 4:50:01 AM1/18/10
to
Hi,

On Mon, 2010-01-18 at 09:57 +0900, Tejun Heo wrote:

> Workqueue can now handle high concurrency. Use system_long_wq instead
> of slow-work.
>
> Signed-off-by: Tejun Heo <t...@kernel.org>
> Cc: Steven Whitehouse <swhi...@redhat.com>

Acked-by: Steven Whitehouse <swhi...@redhat.com> on two conditions:

i) That scheduling work on this new workqueue will not require any
GFP_KERNEL allocations (even hidden ones such as starting new threads)
before the work runs. This is required since the recovery code must not
call into the fs until after its recovered.
ii) That there is no interaction between this workqueue and the
"delayed" workqueue which the glock code uses since the recovery must
not block that workqueue, nor must that workqueue block recovery.

Having read briefly through the other patches, I believe that both those
two conditions are met, but I thought I'd ask too, just to be on the
safe side. Otherwise it looks like a nice clean up,

Steve.

Peter Zijlstra

unread,
Jan 18, 2010, 5:20:01 AM1/18/10
to
On Mon, 2010-01-18 at 09:57 +0900, Tejun Heo wrote:
> If called after sched_class chooses a CPU which isn't in a task's
> cpus_allowed mask,

I can only see this happening when you're changing cpus_allowed after
starting to take down the cpu. IOW you're moving a thread to a dying
cpu.

This is because you're spawning workqueue threads while we're going
down?

Tejun Heo

unread,
Jan 18, 2010, 6:30:01 AM1/18/10
to
Hello,

On 01/18/2010 07:13 PM, Peter Zijlstra wrote:
> On Mon, 2010-01-18 at 09:57 +0900, Tejun Heo wrote:
>> If called after sched_class chooses a CPU which isn't in a task's
>> cpus_allowed mask,
>
> I can only see this happening when you're changing cpus_allowed after
> starting to take down the cpu. IOW you're moving a thread to a dying
> cpu.
>
> This is because you're spawning workqueue threads while we're going
> down?

It got triggered by the hotplug callback creating the trustee kthread
and kthread_bind()ing it to the target cpu during CPU_DOWN_PREPARE.

Thanks.

--
tejun

Tejun Heo

unread,
Jan 18, 2010, 6:30:01 AM1/18/10
to
On 01/18/2010 06:45 PM, Steven Whitehouse wrote:
> Hi,
>
> On Mon, 2010-01-18 at 09:57 +0900, Tejun Heo wrote:
>> Workqueue can now handle high concurrency. Use system_long_wq instead
>> of slow-work.
>>
>> Signed-off-by: Tejun Heo <t...@kernel.org>
>> Cc: Steven Whitehouse <swhi...@redhat.com>
>
> Acked-by: Steven Whitehouse <swhi...@redhat.com> on two conditions:
>
> i) That scheduling work on this new workqueue will not require any
> GFP_KERNEL allocations (even hidden ones such as starting new threads)
> before the work runs. This is required since the recovery code must not
> call into the fs until after its recovered.

Oh, if that's the case, it needs its own wq with a rescuer. I thought
the recovery path wasn't invoked during allocation. slow-work didn't
guarantee such thing either. Anyways, changing that is pretty easy.

Thanks.

--
tejun

Steven Whitehouse

unread,
Jan 18, 2010, 7:10:03 AM1/18/10
to
Hi,

On Mon, 2010-01-18 at 20:24 +0900, Tejun Heo wrote:
> On 01/18/2010 06:45 PM, Steven Whitehouse wrote:
> > Hi,
> >
> > On Mon, 2010-01-18 at 09:57 +0900, Tejun Heo wrote:
> >> Workqueue can now handle high concurrency. Use system_long_wq instead
> >> of slow-work.
> >>
> >> Signed-off-by: Tejun Heo <t...@kernel.org>
> >> Cc: Steven Whitehouse <swhi...@redhat.com>
> >
> > Acked-by: Steven Whitehouse <swhi...@redhat.com> on two conditions:
> >
> > i) That scheduling work on this new workqueue will not require any
> > GFP_KERNEL allocations (even hidden ones such as starting new threads)
> > before the work runs. This is required since the recovery code must not
> > call into the fs until after its recovered.
>
> Oh, if that's the case, it needs its own wq with a rescuer. I thought
> the recovery path wasn't invoked during allocation. slow-work didn't
> guarantee such thing either. Anyways, changing that is pretty easy.
>
> Thanks.
>

Hmm, I thought I'd checked slow work pretty carefully before I decided
to use it :( Looking at it though, its pretty unlikely that it would
cause a problem. We can be 100% safe by just increasing the number of
slow work threads to one per mounted gfs2 fs (assuming no other slow
work users).

Even then it starts new threads by scheduling slow work and thus it
looks like recovery would run before the slow work to start a new
thread, so its much less likely to cause a problem than if the new
thread was started before the slow work item was executed. We haven't
seen a problem during testing so far.

Anyway, if its easy to solve that problem in the new code, thats all
good :-) Thanks for pointing out this issue,

Steve.

Arjan van de Ven

unread,
Jan 18, 2010, 10:30:02 AM1/18/10
to
On Mon, 18 Jan 2010 17:49:39 +0900
Tejun Heo <t...@kernel.org> wrote:

> For what async is currently used for, I don't think there will be any
> noticeable difference. If the proposed implementation is lacking
> somewhere, we can definitely improve it although I'm not sure whether
> it will end up with the cookie thing.

it is not that I do not like your backend implementation. I do not like
the programming model change you're introducing.
The cookie API allows for what is sort of the equivalent of out-of-order
execution that the cpu does. In a very very simple way, you can start
things in an order, then they execute in variable times and in parallel,
and then when the side effects need to become visible (device
registration, whatever), you go back to an in-order model.
I have patches to do this for KMS and we're working on getting
something working for ACPI as well.

Your API/model change gets rid of this conceptually simple programming
model, which makes using it on other places more complex and messy. I
do not see what enormous benefit your patches would have that would
justify complicating the programming model. (And "sharing the thread
pool" is not that; I'm sure it's possible to share the thread pool
without changing the programming model... and it's not that the async
thread pools are that big or complex anyway)

So consider the current patches

NAK-ed-by: Arjan van de Ven <ar...@linux.intel.com>

--
Arjan van de Ven Intel Open Source Technology Centre
For development, discussion and tips for power savings,
visit http://www.lesswatts.org

Arjan van de Ven

unread,
Jan 18, 2010, 8:00:02 PM1/18/10
to
On 1/18/2010 16:57, Tejun Heo wrote:
> Hello, Arjan.

>
> On 01/19/2010 12:25 AM, Arjan van de Ven wrote:
>> Your API/model change gets rid of this conceptually simple programming
>> model, which makes using it on other places more complex and messy. I
>> do not see what enormous benefit your patches would have that would
>> justify complicating the programming model. (And "sharing the thread
>> pool" is not that; I'm sure it's possible to share the thread pool
>> without changing the programming model... and it's not that the async
>> thread pools are that big or complex anyway)
>
> Oh yeah, if you want the cookies, it can be implemented on top of
> workqueue. I'm just not sure whether that has justifiable benefit.
> About the same things can be achived using flushes and if some
> subsystem want low level control, it can simply use the workqueue
> directly implementing whatever level of concurrency as it sees fit and
> flushing as necessary.
>
> As cmwq can provide async contexts flexibly, the benefit of async is
> simplification of simple cases where the caller doesn't have to worry
> about allocating works or whatever. I don't really see much point in
> introducing a different set of sync rules based on cookies when the
> same thing can be achieved using wq interfaces directly and I think
> that having two different models for async synchronization might hurt
> more than help. What type of ordering rules are you currently working
> on using cookies?

there are two types:
there's the domains, where you synchronize only within a domain,
and then there's the "async string", think ACPI.
the ACPI init is a whole series of sort of dependent steps, where you synchronize
about halfway, but the whole set runs async to all other inits in the system, and only
near the very end when a full synchronization is done do you wait.
basically what you get (sorry, lame ascii graph)

*************************************** (main init flow)
*** driver 1
* ** driver 2
* ** driver 3
* ** driver 4
* ** driver 5

where you get maximum concurrency during the pre-synchronization part,
and a "chain" of synchronized execution *as part of the same function flow*,
but possibly independent of other synchronization flows.


the async infrastructure as you say took away the hassle of allocating, and more
importantly, caring for the lifetime of the metadata object. But it also introduced
a sychronization mechanism that is natural and simple for driver init and some other flows.

>
> Thanks.

Tejun Heo

unread,
Jan 18, 2010, 8:00:02 PM1/18/10
to
Hello, Arjan.

On 01/19/2010 12:25 AM, Arjan van de Ven wrote:
> Your API/model change gets rid of this conceptually simple programming
> model, which makes using it on other places more complex and messy. I
> do not see what enormous benefit your patches would have that would
> justify complicating the programming model. (And "sharing the thread
> pool" is not that; I'm sure it's possible to share the thread pool
> without changing the programming model... and it's not that the async
> thread pools are that big or complex anyway)

Oh yeah, if you want the cookies, it can be implemented on top of


workqueue. I'm just not sure whether that has justifiable benefit.
About the same things can be achived using flushes and if some
subsystem want low level control, it can simply use the workqueue
directly implementing whatever level of concurrency as it sees fit and
flushing as necessary.

As cmwq can provide async contexts flexibly, the benefit of async is
simplification of simple cases where the caller doesn't have to worry
about allocating works or whatever. I don't really see much point in
introducing a different set of sync rules based on cookies when the
same thing can be achieved using wq interfaces directly and I think
that having two different models for async synchronization might hurt
more than help. What type of ordering rules are you currently working
on using cookies?

Thanks.

--
tejun

Tejun Heo

unread,
Jan 18, 2010, 8:00:03 PM1/18/10
to
Hello,

On 01/18/2010 09:07 PM, Steven Whitehouse wrote:
> Hmm, I thought I'd checked slow work pretty carefully before I decided
> to use it :( Looking at it though, its pretty unlikely that it would
> cause a problem. We can be 100% safe by just increasing the number of
> slow work threads to one per mounted gfs2 fs (assuming no other slow
> work users).

I don't think that will guarantee anything as there's nothing which
guarantees the new additional work to the gfs2 code. At any rate,
these problems are pretty unlikely to happen but they still need
guarantees, so...

> Anyway, if its easy to solve that problem in the new code, thats all
> good :-) Thanks for pointing out this issue,

Yeap, I'll post updated patch soonish.

Thanks.

--
tejun

Tejun Heo

unread,
Jan 19, 2010, 3:00:02 AM1/19/10
to
Hello, Arjan.

On 01/19/2010 09:57 AM, Arjan van de Ven wrote:
> there are two types:
> there's the domains, where you synchronize only within a domain,

Maps pretty nicely to a wq which is a queueing and flushing domain
after all.

> and then there's the "async string", think ACPI. the ACPI init is a
> whole series of sort of dependent steps, where you synchronize about
> halfway, but the whole set runs async to all other inits in the
> system, and only near the very end when a full synchronization is
> done do you wait. basically what you get (sorry, lame ascii graph)
>
> *************************************** (main init flow)
> *** driver 1
> * ** driver 2
> * ** driver 3
> * ** driver 4
> * ** driver 5
>
> where you get maximum concurrency during the pre-synchronization
> part, and a "chain" of synchronized execution *as part of the same
> function flow*, but possibly independent of other synchronization
> flows.

This too can be implemented using wq directly. More below.

> the async infrastructure as you say took away the hassle of
> allocating, and more importantly, caring for the lifetime of the
> metadata object. But it also introduced a sychronization mechanism
> that is natural and simple for driver init and some other flows.

The tradeoff changes with the worker pool implementation can be shared
with workqueue which provides its own ways to control concurrency and
synchronize. Before, the cookie based synchronization is something
inherent to the async mechanism. The async worker pool was needed and
the synchronization mechanism came integrated with it. Now that the
backend can be replaced with workqueue which supplies its own ways of
synchronization, the cookie based synchronization model needs stronger
justification as it no longer comes as a integral part of something
bigger which is needed anyway.

I'm sure the cookie based synchronization has its benefits but is the
benefit big enough, or is using workqueue synchronization constructs
difficult enough to justify a completely separate synchronization
model?

If so, we can leave the list based cookie synchronization alone and
simply use wq's to provide concurrency only without using its
synchronization mechanisms (flushes).

As for the current in-kernel users, the simplistic implementation
seems enough to me. Do you think the stuff which is currently being
worked on would benefit a lot from cookie based synchronization
compared to using works and flushes directly?

Thanks.

--
tejun

Tejun Heo

unread,
Jan 19, 2010, 3:50:03 AM1/19/10
to
Workqueue can now handle high concurrency. Convert gfs to use
workqueue instead of slow-work.

* Steven pointed out that recovery path might be run from allocation
path and thus requires forward progress guarantee without memory
allocation. Create and use gfs_recovery_wq with rescuer. Please
note that forward progress wasn't guaranteed with slow-work.

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Steven Whitehouse <swhi...@redhat.com>

---
So, here's the updated version. Only compile tested.

Thanks.

fs/gfs2/Kconfig | 1
fs/gfs2/incore.h | 3 --
fs/gfs2/main.c | 14 +++++++------
fs/gfs2/ops_fstype.c | 8 +++----
fs/gfs2/recovery.c | 54 +++++++++++++++++++--------------------------------
fs/gfs2/recovery.h | 6 +++--
fs/gfs2/sys.c | 3 +-
7 files changed, 40 insertions(+), 49 deletions(-)

Index: work/fs/gfs2/incore.h
===================================================================
--- work.orig/fs/gfs2/incore.h
+++ work/fs/gfs2/incore.h


@@ -12,7 +12,6 @@

#include <linux/fs.h>
#include <linux/workqueue.h>
-#include <linux/slow-work.h>
#include <linux/dlm.h>
#include <linux/buffer_head.h>

@@ -383,7 +382,7 @@ struct gfs2_journal_extent {
struct gfs2_jdesc {
struct list_head jd_list;
struct list_head extent_list;
- struct slow_work jd_work;
+ struct work_struct jd_work;
struct inode *jd_inode;
unsigned long jd_flags;
#define JDF_RECOVERY 1

Index: work/fs/gfs2/main.c
===================================================================
--- work.orig/fs/gfs2/main.c
+++ work/fs/gfs2/main.c


@@ -15,7 +15,6 @@
#include <linux/init.h>
#include <linux/gfs2_ondisk.h>
#include <asm/atomic.h>
-#include <linux/slow-work.h>

#include "gfs2.h"
#include "incore.h"

@@ -24,6 +23,7 @@
#include "util.h"
#include "glock.h"
#include "quota.h"
+#include "recovery.h"

static struct shrinker qd_shrinker = {
.shrink = gfs2_shrink_qd_memory,
@@ -114,9 +114,11 @@ static int __init init_gfs2_fs(void)


if (error)
goto fail_unregister;

- error = slow_work_register_user(THIS_MODULE);
- if (error)
- goto fail_slow;

+ error = -ENOMEM;
+ gfs_recovery_wq = __create_workqueue("gfs_recovery", WQ_RESCUER,
+ WQ_MAX_ACTIVE);
+ if (!gfs_recovery_wq)
+ goto fail_wq;

gfs2_register_debugfs();

@@ -124,7 +126,7 @@ static int __init init_gfs2_fs(void)

return 0;

-fail_slow:
+fail_wq:
unregister_filesystem(&gfs2meta_fs_type);
fail_unregister:
unregister_filesystem(&gfs2_fs_type);
@@ -163,7 +165,7 @@ static void __exit exit_gfs2_fs(void)


gfs2_unregister_debugfs();
unregister_filesystem(&gfs2_fs_type);
unregister_filesystem(&gfs2meta_fs_type);
- slow_work_unregister_user(THIS_MODULE);

+ destroy_workqueue(gfs_recovery_wq);

kmem_cache_destroy(gfs2_quotad_cachep);
kmem_cache_destroy(gfs2_rgrpd_cachep);
Index: work/fs/gfs2/ops_fstype.c
===================================================================
--- work.orig/fs/gfs2/ops_fstype.c
+++ work/fs/gfs2/ops_fstype.c


@@ -17,7 +17,6 @@
#include <linux/namei.h>
#include <linux/mount.h>
#include <linux/gfs2_ondisk.h>
-#include <linux/slow-work.h>
#include <linux/quotaops.h>

#include "gfs2.h"
@@ -673,7 +672,7 @@ static int gfs2_jindex_hold(struct gfs2_

break;

INIT_LIST_HEAD(&jd->extent_list);
- slow_work_init(&jd->jd_work, &gfs2_recover_ops);
+ INIT_WORK(&jd->jd_work, gfs2_recover_func);
jd->jd_inode = gfs2_lookupi(sdp->sd_jindex, &name, 1);
if (!jd->jd_inode || IS_ERR(jd->jd_inode)) {
if (!jd->jd_inode)
@@ -778,7 +777,8 @@ static int init_journal(struct gfs2_sbd

if (sdp->sd_lockstruct.ls_first) {
unsigned int x;
for (x = 0; x < sdp->sd_journals; x++) {
- error = gfs2_recover_journal(gfs2_jdesc_find(sdp, x));
+ error = gfs2_recover_journal(gfs2_jdesc_find(sdp, x),
+ true);
if (error) {
fs_err(sdp, "error recovering journal %u: %d\n",
x, error);
@@ -788,7 +788,7 @@ static int init_journal(struct gfs2_sbd

gfs2_others_may_mount(sdp);
} else if (!sdp->sd_args.ar_spectator) {
- error = gfs2_recover_journal(sdp->sd_jdesc);
+ error = gfs2_recover_journal(sdp->sd_jdesc, true);
if (error) {
fs_err(sdp, "error recovering my journal: %d\n", error);
goto fail_jinode_gh;

Index: work/fs/gfs2/recovery.c
===================================================================
--- work.orig/fs/gfs2/recovery.c
+++ work/fs/gfs2/recovery.c


@@ -14,7 +14,6 @@
#include <linux/buffer_head.h>
#include <linux/gfs2_ondisk.h>
#include <linux/crc32.h>
-#include <linux/slow-work.h>

#include "gfs2.h"
#include "incore.h"

@@ -28,6 +27,8 @@
#include "util.h"
#include "dir.h"

+struct workqueue_struct *gfs_recovery_wq;
+
int gfs2_replay_read_block(struct gfs2_jdesc *jd, unsigned int blk,
struct buffer_head **bh)
{
@@ -443,23 +444,7 @@ static void gfs2_recovery_done(struct gf


kobject_uevent_env(&sdp->sd_kobj, KOBJ_CHANGE, envp);
}

-static int gfs2_recover_get_ref(struct slow_work *work)
-{
- struct gfs2_jdesc *jd = container_of(work, struct gfs2_jdesc, jd_work);
- if (test_and_set_bit(JDF_RECOVERY, &jd->jd_flags))
- return -EBUSY;
- return 0;
-}
-
-static void gfs2_recover_put_ref(struct slow_work *work)
-{
- struct gfs2_jdesc *jd = container_of(work, struct gfs2_jdesc, jd_work);
- clear_bit(JDF_RECOVERY, &jd->jd_flags);
- smp_mb__after_clear_bit();
- wake_up_bit(&jd->jd_flags, JDF_RECOVERY);
-}
-
-static void gfs2_recover_work(struct slow_work *work)
+void gfs2_recover_func(struct work_struct *work)
{
struct gfs2_jdesc *jd = container_of(work, struct gfs2_jdesc, jd_work);
struct gfs2_inode *ip = GFS2_I(jd->jd_inode);

@@ -578,7 +563,7 @@ static void gfs2_recover_work(struct slo


gfs2_glock_dq_uninit(&j_gh);

fs_info(sdp, "jid=%u: Done\n", jd->jd_jid);
- return;
+ goto done;

fail_gunlock_tr:
gfs2_glock_dq_uninit(&t_gh);

@@ -590,32 +575,35 @@ fail_gunlock_j:

+ rv = queue_work(gfs_recovery_wq, &jd->jd_work);


+ BUG_ON(!rv);
+
+ if (wait)
+ wait_on_bit(&jd->jd_flags, JDF_RECOVERY, gfs2_recovery_wait,
+ TASK_UNINTERRUPTIBLE);
+
return 0;
}

Index: work/fs/gfs2/recovery.h
===================================================================
--- work.orig/fs/gfs2/recovery.h
+++ work/fs/gfs2/recovery.h
@@ -12,6 +12,8 @@

#include "incore.h"

+extern struct workqueue_struct *gfs_recovery_wq;
+
static inline void gfs2_replay_incr_blk(struct gfs2_sbd *sdp, unsigned int *blk)
{
if (++*blk == sdp->sd_jdesc->jd_blocks)
@@ -27,8 +29,8 @@ extern void gfs2_revoke_clean(struct gfs



extern int gfs2_find_jhead(struct gfs2_jdesc *jd,
struct gfs2_log_header_host *head);
-extern int gfs2_recover_journal(struct gfs2_jdesc *gfs2_jd);
-extern struct slow_work_ops gfs2_recover_ops;
+extern int gfs2_recover_journal(struct gfs2_jdesc *gfs2_jd, bool wait);
+extern void gfs2_recover_func(struct work_struct *work);

#endif /* __RECOVERY_DOT_H__ */

Index: work/fs/gfs2/sys.c
===================================================================
--- work.orig/fs/gfs2/sys.c
+++ work/fs/gfs2/sys.c


@@ -26,6 +26,7 @@
#include "quota.h"
#include "util.h"
#include "glops.h"
+#include "recovery.h"

struct gfs2_attr {
struct attribute attr;
@@ -351,7 +352,7 @@ static ssize_t recover_store(struct gfs2

list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
if (jd->jd_jid != jid)
continue;
- rv = slow_work_enqueue(&jd->jd_work);
+ rv = gfs2_recover_journal(jd, false);
break;
}
out:

Index: work/fs/gfs2/Kconfig
===================================================================
--- work.orig/fs/gfs2/Kconfig
+++ work/fs/gfs2/Kconfig


@@ -7,7 +7,6 @@ config GFS2_FS
select IP_SCTP if DLM_SCTP
select FS_POSIX_ACL
select CRC32
- select SLOW_WORK
select QUOTA
select QUOTACTL
help

Jeff Layton

unread,
Jan 19, 2010, 7:30:02 AM1/19/10
to

This block of code looks problematic. This code is run by the
cifs_demultiplex_thread (cifsd). We can't do an oplock_break_put in
this context, since it might trigger a blocking SMB and cause a
deadlock.

A while back, I backported this code to earlier kernels and used a
standard workqueue there. What I did there was to only do the "get" if
the queue_work succeeded, and then had the queued work take and
immediately drop the GlobalSMBSeslock first thing. Yes, it's ugly, but
it prevented the possible deadlock and didn't require adding anything
like completion vars to the struct.

Also, this change seems to have changed the logic a bit. The
oplock_break_cancelled flag is being set to false unconditionally, and
the printk was dropped. Not a big deal on the last part, but we can't
really do much with errors in this codepath so it might be helpful to
have some indication that there are problems here.

Other than the above problems (which are easily fixable), this patch
seems fine.

--
Jeff Layton <jla...@redhat.com>

Arjan van de Ven

unread,
Jan 19, 2010, 9:40:01 AM1/19/10
to
On Tue, 19 Jan 2010 16:56:46 +0900
Tejun Heo <t...@kernel.org> wrote:

> > where you get maximum concurrency during the pre-synchronization
> > part, and a "chain" of synchronized execution *as part of the same
> > function flow*, but possibly independent of other synchronization
> > flows.
>
> This too can be implemented using wq directly. More below.

however you are forcing the function to be split in pieces,
which makes for a more complex programming model.
For example, I have trouble proving to myself that your ata conversion
is acutally correct.


>
> The tradeoff changes with the worker pool implementation can be shared
> with workqueue which provides its own ways to control concurrency and
> synchronize.

while I don't mind sharing the pool implementation (all 20 lines of
it ;-), I don't think the objective of sharing some implementation
detail is worth complicating the programming model.


> Before, the cookie based synchronization is something
> inherent to the async mechanism. The async worker pool was needed and
> the synchronization mechanism came integrated with it. Now that the
> backend can be replaced with workqueue which supplies its own ways of
> synchronization, the cookie based synchronization model needs stronger
> justification as it no longer comes as a integral part of something
> bigger which is needed anyway.

the wq model is either "full async" or "fully ordered".
the cookie mechanism allows for "run async for the expensive bit, and
then INSIDE THE SAME FUNCTION, synchronize, and then run some more".

> If so, we can leave the list based cookie synchronization alone and
> simply use wq's to provide concurrency only without using its
> synchronization mechanisms (flushes).

can you flush from inside a wq element?
That's the critical part that makes the cookie based system easy to
program.

--
Arjan van de Ven Intel Open Source Technology Centre
For development, discussion and tips for power savings,
visit http://www.lesswatts.org

Tejun Heo

unread,
Jan 19, 2010, 7:20:02 PM1/19/10
to
Hello, Arjan.

On 01/19/2010 11:37 PM, Arjan van de Ven wrote:
>> This too can be implemented using wq directly. More below.
>
> however you are forcing the function to be split in pieces,
> which makes for a more complex programming model.
> For example, I have trouble proving to myself that your ata conversion
> is acutally correct.

I think it is. :-)

>> The tradeoff changes with the worker pool implementation can be shared
>> with workqueue which provides its own ways to control concurrency and
>> synchronize.
>
> while I don't mind sharing the pool implementation (all 20 lines of
> it ;-), I don't think the objective of sharing some implementation
> detail is worth complicating the programming model.

Oh yeah, we can definitely pay some lines of code for a separate
synchronization model if it makes driver writers' lives easier. I'm
just wondering whether the benefit is enough to justify a separate
sync model.

>> Before, the cookie based synchronization is something
>> inherent to the async mechanism. The async worker pool was needed and
>> the synchronization mechanism came integrated with it. Now that the
>> backend can be replaced with workqueue which supplies its own ways of
>> synchronization, the cookie based synchronization model needs stronger
>> justification as it no longer comes as a integral part of something
>> bigger which is needed anyway.
>
> the wq model is either "full async" or "fully ordered".
> the cookie mechanism allows for "run async for the expensive bit, and
> then INSIDE THE SAME FUNCTION, synchronize, and then run some more".

Hmmm...

>> If so, we can leave the list based cookie synchronization alone and
>> simply use wq's to provide concurrency only without using its
>> synchronization mechanisms (flushes).
>
> can you flush from inside a wq element? That's the critical part
> that makes the cookie based system easy to program.

Yeah, you can flush individual works from other works and wqs from
works running from different wqs. What's not allowed is flushing the
wq a work is running on from the work. Let's say if the flush code
can be modified to do so, would that change your opinion?

Thanks.

--
tejun

Tejun Heo

unread,
Jan 19, 2010, 7:20:02 PM1/19/10
to
Hello,

Okay, thanks for pointing it out.

> A while back, I backported this code to earlier kernels and used a
> standard workqueue there. What I did there was to only do the "get" if
> the queue_work succeeded, and then had the queued work take and
> immediately drop the GlobalSMBSeslock first thing. Yes, it's ugly, but
> it prevented the possible deadlock and didn't require adding anything
> like completion vars to the struct.

Hmmm... Why is locking GlobalSMBSeslock necessary?
cifs_oplock_break_get() can never fail and it seems that
is_valid_oplock_break() should be holding valid reference by the time
it enqueues the work, so wouldn't the following be sufficient?

if (queue_work(system_single_wq, &netfile->oplock_break))
cifs_oplock_break_get(netfile);

> Also, this change seems to have changed the logic a bit. The
> oplock_break_cancelled flag is being set to false unconditionally, and
> the printk was dropped. Not a big deal on the last part, but we can't
> really do much with errors in this codepath so it might be helpful to
> have some indication that there are problems here.

The thing is that slow_work_enqueue() can only fail if getting a
reference fails. In cifs' case, it always succeeds so there's no
failure case to handle there.

> Other than the above problems (which are easily fixable), this patch
> seems fine.

Thanks.

--
tejun

Arjan van de Ven

unread,
Jan 19, 2010, 7:40:02 PM1/19/10
to
On 1/19/2010 16:19, Tejun Heo wrote:

> Yeah, you can flush individual works from other works and wqs from
> works running from different wqs. What's not allowed is flushing the
> wq a work is running on from the work. Let's say if the flush code
> can be modified to do so, would that change your opinion?

once you get "run in parallel, but have an API to wait on everyone who was scheduled before me"...
... that'd be fine ;)

but then you pretty much HAVE the cookie API, even if you don't have an actual cookie.
(just the cookie was an easy way to determine the "before me")

Jeff Layton

unread,
Jan 19, 2010, 8:00:02 PM1/19/10
to

I guess I wasn't sure I could count on that. It seems unlikely that the
work would run before you did the "get", but unlikely races are even
harder to troubleshoot when they do get hit.

FWIW, it's not a terribly hot codepath, so taking and dropping the
spinlock shouldn't be too bad for performance. If you're certain that
we don't need to worry about it though, then maybe we can just skip
doing that.

> > Also, this change seems to have changed the logic a bit. The
> > oplock_break_cancelled flag is being set to false unconditionally, and
> > the printk was dropped. Not a big deal on the last part, but we can't
> > really do much with errors in this codepath so it might be helpful to
> > have some indication that there are problems here.
>
> The thing is that slow_work_enqueue() can only fail if getting a
> reference fails. In cifs' case, it always succeeds so there's no
> failure case to handle there.
>

Ok, but here we're changing this to queue_work. Is that also guaranteed
to succeed here? If so, then dropping the printk is fine. If not, then
I think we should keep it in.

It's been a while since I overhauled this code, so I'll need to look
again at the semantics for the oplock_break_cancelled flag. It may be
ok to just set it unconditionally like this, but I'll need to check
and be sure.

--
Jeff Layton <jla...@redhat.com>

Tejun Heo

unread,
Jan 19, 2010, 8:20:01 PM1/19/10
to
On 01/20/2010 09:56 AM, Jeff Layton wrote:
>> Hmmm... Why is locking GlobalSMBSeslock necessary?
>> cifs_oplock_break_get() can never fail and it seems that
>> is_valid_oplock_break() should be holding valid reference by the time
>> it enqueues the work, so wouldn't the following be sufficient?
>>
>> if (queue_work(system_single_wq, &netfile->oplock_break))
>> cifs_oplock_break_get(netfile);
>>
>
> I guess I wasn't sure I could count on that. It seems unlikely that the
> work would run before you did the "get", but unlikely races are even
> harder to troubleshoot when they do get hit.
>
> FWIW, it's not a terribly hot codepath, so taking and dropping the
> spinlock shouldn't be too bad for performance. If you're certain that
> we don't need to worry about it though, then maybe we can just skip
> doing that.

Oooh, I don't know the code very well and can't guarantee any of that.
Ah, okay, I was confused. Work can run and finish before the
reference is incremented. So, yeap, I'll add the spinlocking in the
work function.

>>> Also, this change seems to have changed the logic a bit. The
>>> oplock_break_cancelled flag is being set to false unconditionally, and
>>> the printk was dropped. Not a big deal on the last part, but we can't
>>> really do much with errors in this codepath so it might be helpful to
>>> have some indication that there are problems here.
>>
>> The thing is that slow_work_enqueue() can only fail if getting a
>> reference fails. In cifs' case, it always succeeds so there's no
>> failure case to handle there.
>>
>
> Ok, but here we're changing this to queue_work. Is that also guaranteed
> to succeed here? If so, then dropping the printk is fine. If not, then
> I think we should keep it in.

Yeap, queue_work() is guaranteed to succeed. The only possible
outcomes are 1. queued or 2. is already queued.

> It's been a while since I overhauled this code, so I'll need to look
> again at the semantics for the oplock_break_cancelled flag. It may be
> ok to just set it unconditionally like this, but I'll need to check
> and be sure.

Thanks.

--
tejun

Tejun Heo

unread,
Jan 19, 2010, 9:10:02 PM1/19/10
to
Hello,

On 01/20/2010 09:31 AM, Arjan van de Ven wrote:
> On 1/19/2010 16:19, Tejun Heo wrote:
>
>> Yeah, you can flush individual works from other works and wqs from
>> works running from different wqs. What's not allowed is flushing the
>> wq a work is running on from the work. Let's say if the flush code
>> can be modified to do so, would that change your opinion?
>
> once you get "run in parallel, but have an API to wait on everyone
> who was scheduled before me"... ... that'd be fine ;)

Cool, I'll give a shot at it then. I think it would be better to
adapt the existing interface to the new uses if at all possible.

> but then you pretty much HAVE the cookie API, even if you don't have
> an actual cookie. (just the cookie was an easy way to determine the
> "before me")

Yeap, but then again, whatever we do, all those synchronization
interfaces can be mapped onto each other eventually.

Thanks.

--
tejun

Arjan van de Ven

unread,
Jan 20, 2010, 1:10:01 AM1/20/10
to
On Wed, 20 Jan 2010 11:08:16 +0900
Tejun Heo <t...@kernel.org> wrote:

>
> Yeap, but then again, whatever we do, all those synchronization
> interfaces can be mapped onto each other eventually.

and maybe we need to be smart about this;
for me, sharing the backend implementation (the pool part) makes sense,
although a thread pool really is not much code. But a smart thread pool
may be.

as for interfaces, I really really think it's ok to have different
interfaces for usecases that are very different, as long as the
interfaces are logical in their domain. I rather have 2 interfaces, each
logical to their domain, than a forced joined interface that doesn't
really naturally fit either.


--
Arjan van de Ven Intel Open Source Technology Centre
For development, discussion and tips for power savings,
visit http://www.lesswatts.org

Tejun Heo

unread,
Jan 20, 2010, 3:30:02 AM1/20/10
to
Hello,

On 01/20/2010 03:03 PM, Arjan van de Ven wrote:
>> Yeap, but then again, whatever we do, all those synchronization
>> interfaces can be mapped onto each other eventually.

Eh... gave it a shot and it was too complex.

> and maybe we need to be smart about this;
> for me, sharing the backend implementation (the pool part) makes sense,
> although a thread pool really is not much code. But a smart thread pool
> may be.
>
> as for interfaces, I really really think it's ok to have different
> interfaces for usecases that are very different, as long as the
> interfaces are logical in their domain. I rather have 2 interfaces, each
> logical to their domain, than a forced joined interface that doesn't
> really naturally fit either.

I'll just replace the backend worker pool for now. If necessary, we
can try to unify the sync model later, I suppose.

Thanks.

--
tejun

Tejun Heo

unread,
Jan 22, 2010, 6:00:02 AM1/22/10
to
Replace private worker pool with system_long_wq.

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Arjan van de Ven <ar...@infradead.org>
---
Alright, here's the patch to just convert the worker pool. Boots fine
here.

Thanks.

kernel/async.c | 140 ++++++++-------------------------------------------------
1 file changed, 21 insertions(+), 119 deletions(-)

Index: work/kernel/async.c
===================================================================
--- work.orig/kernel/async.c
+++ work/kernel/async.c
@@ -49,39 +49,31 @@ asynchronous and synchronous parts of th
*/

#include <linux/async.h>
-#include <linux/bug.h>
#include <linux/module.h>
#include <linux/wait.h>
#include <linux/sched.h>
-#include <linux/init.h>
-#include <linux/kthread.h>
-#include <linux/delay.h>
#include <asm/atomic.h>

static async_cookie_t next_cookie = 1;

-#define MAX_THREADS 256
#define MAX_WORK 32768

static LIST_HEAD(async_pending);
static LIST_HEAD(async_running);
static DEFINE_SPINLOCK(async_lock);

-static int async_enabled = 0;
-
struct async_entry {
- struct list_head list;
- async_cookie_t cookie;
- async_func_ptr *func;
- void *data;
- struct list_head *running;
+ struct list_head list;
+ struct work_struct work;
+ async_cookie_t cookie;
+ async_func_ptr *func;
+ void *data;
+ struct list_head *running;
};

static DECLARE_WAIT_QUEUE_HEAD(async_done);
-static DECLARE_WAIT_QUEUE_HEAD(async_new);

static atomic_t entry_count;
-static atomic_t thread_count;

extern int initcall_debug;

@@ -116,27 +108,23 @@ static async_cookie_t lowest_in_progres
spin_unlock_irqrestore(&async_lock, flags);
return ret;
}
+
/*
* pick the first pending entry and run it
*/
-static void run_one_entry(void)
+static void async_run_entry_fn(struct work_struct *work)
{
+ struct async_entry *entry =
+ container_of(work, struct async_entry, work);
unsigned long flags;
- struct async_entry *entry;
ktime_t calltime, delta, rettime;

- /* 1) pick one task from the pending queue */
-
+ /* 1) move self to the running queue */
spin_lock_irqsave(&async_lock, flags);
- if (list_empty(&async_pending))
- goto out;
- entry = list_first_entry(&async_pending, struct async_entry, list);
-
- /* 2) move it to the running queue */
list_move_tail(&entry->list, entry->running);
spin_unlock_irqrestore(&async_lock, flags);

- /* 3) run it (and print duration)*/
+ /* 2) run (and print duration) */
if (initcall_debug && system_state == SYSTEM_BOOTING) {
printk("calling %lli_%pF @ %i\n", (long long)entry->cookie,
entry->func, task_pid_nr(current));
@@ -152,31 +140,25 @@ static void run_one_entry(void)
(long long)ktime_to_ns(delta) >> 10);
}

- /* 4) remove it from the running queue */
+ /* 3) remove self from the running queue */
spin_lock_irqsave(&async_lock, flags);
list_del(&entry->list);

- /* 5) free the entry */
+ /* 4) free the entry */
kfree(entry);
atomic_dec(&entry_count);

spin_unlock_irqrestore(&async_lock, flags);

- /* 6) wake up any waiters. */
+ /* 5) wake up any waiters */
wake_up(&async_done);
- return;
-
-out:
- spin_unlock_irqrestore(&async_lock, flags);
}

-
static async_cookie_t __async_schedule(async_func_ptr *ptr, void *data, struct list_head *running)
{
struct async_entry *entry;
unsigned long flags;
async_cookie_t newcookie;
-

/* allow irq-off callers */
entry = kzalloc(sizeof(struct async_entry), GFP_ATOMIC);
@@ -185,7 +167,7 @@ static async_cookie_t __async_schedule(a
* If we're out of memory or if there's too much work
* pending already, we execute synchronously.
*/
- if (!async_enabled || !entry || atomic_read(&entry_count) > MAX_WORK) {
+ if (!entry || atomic_read(&entry_count) > MAX_WORK) {
kfree(entry);
spin_lock_irqsave(&async_lock, flags);
newcookie = next_cookie++;
@@ -195,6 +177,7 @@ static async_cookie_t __async_schedule(a
ptr(data, newcookie);
return newcookie;
}
+ INIT_WORK(&entry->work, async_run_entry_fn);
entry->func = ptr;
entry->data = data;
entry->running = running;
@@ -204,7 +187,10 @@ static async_cookie_t __async_schedule(a
list_add_tail(&entry->list, &async_pending);
atomic_inc(&entry_count);
spin_unlock_irqrestore(&async_lock, flags);
- wake_up(&async_new);
+
+ /* schedule for execution */
+ queue_work(system_long_wq, &entry->work);
+
return newcookie;
}

@@ -311,87 +297,3 @@ void async_synchronize_cookie(async_cook
async_synchronize_cookie_domain(cookie, &async_running);
}
EXPORT_SYMBOL_GPL(async_synchronize_cookie);
-
-
-static int async_thread(void *unused)
-{
- DECLARE_WAITQUEUE(wq, current);
- add_wait_queue(&async_new, &wq);
-
- while (!kthread_should_stop()) {
- int ret = HZ;
- set_current_state(TASK_INTERRUPTIBLE);
- /*
- * check the list head without lock.. false positives
- * are dealt with inside run_one_entry() while holding
- * the lock.
- */
- rmb();
- if (!list_empty(&async_pending))
- run_one_entry();
- else
- ret = schedule_timeout(HZ);
-
- if (ret == 0) {
- /*
- * we timed out, this means we as thread are redundant.
- * we sign off and die, but we to avoid any races there
- * is a last-straw check to see if work snuck in.
- */
- atomic_dec(&thread_count);
- wmb(); /* manager must see our departure first */
- if (list_empty(&async_pending))
- break;
- /*
- * woops work came in between us timing out and us
- * signing off; we need to stay alive and keep working.
- */
- atomic_inc(&thread_count);
- }
- }
- remove_wait_queue(&async_new, &wq);
-


- return 0;
-}
-

-static int async_manager_thread(void *unused)
-{
- DECLARE_WAITQUEUE(wq, current);
- add_wait_queue(&async_new, &wq);
-
- while (!kthread_should_stop()) {
- int tc, ec;
-
- set_current_state(TASK_INTERRUPTIBLE);
-
- tc = atomic_read(&thread_count);
- rmb();
- ec = atomic_read(&entry_count);
-
- while (tc < ec && tc < MAX_THREADS) {
- if (IS_ERR(kthread_run(async_thread, NULL, "async/%i",
- tc))) {
- msleep(100);
- continue;
- }
- atomic_inc(&thread_count);
- tc++;
- }
-
- schedule();
- }
- remove_wait_queue(&async_new, &wq);
-


- return 0;
-}
-

-static int __init async_init(void)
-{
- async_enabled =
- !IS_ERR(kthread_run(async_manager_thread, NULL, "async/mgr"));
-
- WARN_ON(!async_enabled);


- return 0;
-}
-

-core_initcall(async_init);

Tejun Heo

unread,
Jan 22, 2010, 6:20:02 AM1/22/10
to
Workqueue can now handle high concurrency. Use system_single_wq
instead of slow-work.

* Updated is_valid_oplock_break() to not call cifs_oplock_break_put()
as advised by Steve French. It might cause deadlock. Instead,
reference is increased after queueing succeeded and
cifs_oplock_break() briefly grabs GlobalSMBSeslock before putting
the cfile to make sure it doesn't put before the matching get is
finished.

Signed-off-by: Tejun Heo <t...@kernel.org>
Cc: Steve French <sfr...@samba.org>
---

Another approach would be to put from a different work which would be
cleaner but need more code. How does this one look to you?

Thanks.

fs/cifs/Kconfig | 1 -
fs/cifs/cifsfs.c | 6 +-----
fs/cifs/cifsglob.h | 8 +++++---
fs/cifs/dir.c | 2 +-

fs/cifs/file.c | 30 +++++++++++++-----------------
fs/cifs/misc.c | 20 ++++++++++++--------
6 files changed, 32 insertions(+), 35 deletions(-)

Index: work/fs/cifs/cifsfs.c
===================================================================
--- work.orig/fs/cifs/cifsfs.c
+++ work/fs/cifs/cifsfs.c


@@ -1038,15 +1038,10 @@ init_cifs(void)
if (rc)
goto out_unregister_key_type;
#endif
- rc = slow_work_register_user(THIS_MODULE);
- if (rc)
- goto out_unregister_resolver_key;

return 0;

- out_unregister_resolver_key:
#ifdef CONFIG_CIFS_DFS_UPCALL
- unregister_key_type(&key_type_dns_resolver);
out_unregister_key_type:
#endif
#ifdef CONFIG_CIFS_UPCALL
@@ -1069,6 +1064,7 @@ static void __exit
exit_cifs(void)
{
cFYI(DBG2, ("exit_cifs"));
+ flush_workqueue(system_single_wq);
cifs_proc_clean();
#ifdef CONFIG_CIFS_DFS_UPCALL
cifs_dfs_release_automount_timer();

Index: work/fs/cifs/cifsglob.h
===================================================================
--- work.orig/fs/cifs/cifsglob.h
+++ work/fs/cifs/cifsglob.h


@@ -18,7 +18,7 @@
*/
#include <linux/in.h>
#include <linux/in6.h>
-#include <linux/slow-work.h>
+#include <linux/workqueue.h>
#include "cifs_fs_sb.h"
#include "cifsacl.h"
/*
@@ -356,7 +356,7 @@ struct cifsFileInfo {
atomic_t count; /* reference count */
struct mutex fh_mutex; /* prevents reopen race after dead ses*/
struct cifs_search_info srch_inf;
- struct slow_work oplock_break; /* slow_work job for oplock breaks */
+ struct work_struct oplock_break; /* work for oplock breaks */
};

/* Take a reference on the file private data */
@@ -723,4 +723,6 @@ GLOBAL_EXTERN unsigned int cifs_min_rcv;

GLOBAL_EXTERN unsigned int cifs_min_small; /* min size of small buf pool */
GLOBAL_EXTERN unsigned int cifs_max_pending; /* MAX requests at once to server*/

-extern const struct slow_work_ops cifs_oplock_break_ops;
+void cifs_oplock_break(struct work_struct *work);
+void cifs_oplock_break_get(struct cifsFileInfo *cfile);
+void cifs_oplock_break_put(struct cifsFileInfo *cfile);

Index: work/fs/cifs/dir.c
===================================================================
--- work.orig/fs/cifs/dir.c
+++ work/fs/cifs/dir.c


@@ -157,7 +157,7 @@ cifs_new_fileinfo(struct inode *newinode

mutex_init(&pCifsFile->lock_mutex);
INIT_LIST_HEAD(&pCifsFile->llist);
atomic_set(&pCifsFile->count, 1);
- slow_work_init(&pCifsFile->oplock_break, &cifs_oplock_break_ops);
+ INIT_WORK(&pCifsFile->oplock_break, cifs_oplock_break);

write_lock(&GlobalSMBSeslock);
list_add(&pCifsFile->tlist, &cifs_sb->tcon->openFileList);

Index: work/fs/cifs/file.c
===================================================================
--- work.orig/fs/cifs/file.c
+++ work/fs/cifs/file.c


@@ -2276,8 +2276,7 @@ out:
return rc;
}

-static void
-cifs_oplock_break(struct slow_work *work)
+void cifs_oplock_break(struct work_struct *work)
{
struct cifsFileInfo *cfile = container_of(work, struct cifsFileInfo,
oplock_break);

@@ -2316,33 +2315,30 @@ cifs_oplock_break(struct slow_work *work


LOCKING_ANDX_OPLOCK_RELEASE, false);
cFYI(1, ("Oplock release rc = %d", rc));
}
+

+ /*
+ * We might have kicked in before is_valid_oplock_break()
+ * finished grabbing reference for us. Make sure it's done by
+ * waiting for GlobalSMSSeslock.
+ */
+ write_lock(&GlobalSMBSeslock);
+ write_unlock(&GlobalSMBSeslock);

Index: work/fs/cifs/misc.c
===================================================================
--- work.orig/fs/cifs/misc.c
+++ work/fs/cifs/misc.c


@@ -499,7 +499,6 @@ is_valid_oplock_break(struct smb_hdr *bu

struct cifsTconInfo *tcon;
struct cifsInodeInfo *pCifsInode;
struct cifsFileInfo *netfile;
- int rc;

cFYI(1, ("Checking for oplock break or dnotify response"));
if ((pSMB->hdr.Command == SMB_COM_NT_TRANSACT) &&

@@ -584,13 +583,18 @@ is_valid_oplock_break(struct smb_hdr *bu


pCifsInode->clientCanCacheAll = false;
if (pSMB->OplockLevel == 0)
pCifsInode->clientCanCacheRead = false;
- rc = slow_work_enqueue(&netfile->oplock_break);
- if (rc) {
- cERROR(1, ("failed to enqueue oplock "
- "break: %d\n", rc));
- } else {
- netfile->oplock_break_cancelled = false;
- }
+

+ /*
+ * cifs_oplock_break_put() can't be called
+ * from here. Get reference after queueing
+ * succeeded. cifs_oplock_break() will
+ * synchronize using GlobalSMSSeslock.
+ */
+ if (queue_work(system_single_wq,
+ &netfile->oplock_break))
+ cifs_oplock_break_get(netfile);


+ netfile->oplock_break_cancelled = false;
+
read_unlock(&GlobalSMBSeslock);
read_unlock(&cifs_tcp_ses_lock);
return true;

Index: work/fs/cifs/Kconfig
===================================================================
--- work.orig/fs/cifs/Kconfig
+++ work/fs/cifs/Kconfig


@@ -2,7 +2,6 @@ config CIFS
tristate "CIFS support (advanced network filesystem, SMBFS successor)"
depends on INET
select NLS
- select SLOW_WORK
help
This is the client VFS module for the Common Internet File System
(CIFS) protocol which is the successor to the Server Message Block

Jeff Layton

unread,
Jan 22, 2010, 6:50:02 AM1/22/10
to

^^^^
When I backported this before, I put this at the beginning of the
function, not just before the put. I think that this is actually ok
though. We may end up running the function without holding the
references, but the objects we're concerned about can't be freed until
is_valid_oplock_break drops the spinlock.

I think we want to move the setting of netfile->oplock_break_cancelled
inside of the if above it.

If the work is already queued, I don't think we want to set the flag to
false. Doing so might be problematic if we somehow end up processing
this oplock break after a previous oplock break/reconnect/reopen
sequence, but while the initial oplock break is still running.

> read_unlock(&GlobalSMBSeslock);
> read_unlock(&cifs_tcp_ses_lock);
> return true;
> Index: work/fs/cifs/Kconfig
> ===================================================================
> --- work.orig/fs/cifs/Kconfig
> +++ work/fs/cifs/Kconfig
> @@ -2,7 +2,6 @@ config CIFS
> tristate "CIFS support (advanced network filesystem, SMBFS successor)"
> depends on INET
> select NLS
> - select SLOW_WORK
> help
> This is the client VFS module for the Common Internet File System
> (CIFS) protocol which is the successor to the Server Message Block


--
Jeff Layton <jla...@redhat.com>

Tejun Heo

unread,
Jan 24, 2010, 3:30:02 AM1/24/10
to
Hello,

Hmmm.... I can surely do that but that would be different from the
original code. slow_work_enqueue() doesn't distinguish between
successful enqueue and the one which got ignored because the work was
already queued. With conversion to queue_work(), there's no failure
case there so setting oplock_break_cancelled always is equivalent to
the original code. Even if changing it is the right thing to do, it
should probably be done with a separate patch as it changes the logic.
Are you sure it needs to be changed?

Thanks.

--
tejun

Jeff Layton

unread,
Jan 24, 2010, 7:20:02 AM1/24/10
to

I'm pretty sure we do. This flag only gets set to true if there's a
reconnection event. If there is one, then any oplock break queued up
before that happened is now invalid and shouldn't be sent.

It's a fairly minor point however. Even if we send the oplock break,
it's very unlikely to be treated as valid by the server as I don't
think the file would have a chance to be reopened prior to that.

If this is the way that the code works now, then let's go ahead with
your version and I'll plan to queue up a separate patch to change that
behavior after your changes go in.

Thanks,
--
Jeff Layton <jla...@redhat.com>

Tejun Heo

unread,
Jan 25, 2010, 10:30:03 AM1/25/10
to
Hello,

On 01/24/2010 09:13 PM, Jeff Layton wrote:
>> Are you sure it needs to be changed?
>>
>
> I'm pretty sure we do. This flag only gets set to true if there's a
> reconnection event. If there is one, then any oplock break queued up
> before that happened is now invalid and shouldn't be sent.
>
> It's a fairly minor point however. Even if we send the oplock break,
> it's very unlikely to be treated as valid by the server as I don't
> think the file would have a chance to be reopened prior to that.
>
> If this is the way that the code works now, then let's go ahead with
> your version and I'll plan to queue up a separate patch to change that
> behavior after your changes go in.

Yeap, that sounds good to me or I just can queue a separate patch to
do that along with this one so that you don't have to remember
queueing it later.

Thanks.

--
tejun

0 new messages