How to do a atomic increasing operation on a etcdv3 key?

1,095 views
Skip to first unread message

刘韦菠

unread,
Sep 7, 2017, 2:10:00 AM9/7/17
to etcd-dev
It seems like that there is such a RPC function call in etcdv2, but etcdv3 has different API and I have tried to figure it out but got no ideal how to write the code.
Could someone please give me some hints or example code please?

anthony...@coreos.com

unread,
Sep 7, 2017, 12:41:54 PM9/7/17
to etcd-dev
A Put on a key will increment its version, so that can be used as a counter. If data in a key needs to be atomically updated, Txn can be used to do compare-and-set style operations.

刘韦菠

unread,
Sep 8, 2017, 6:47:03 AM9/8/17
to etcd-dev


在 2017年9月7日星期四 UTC+8下午2:10:00,刘韦菠写道:
It seems like that there is such a RPC function call in etcdv2, but etcdv3 has different API and I have tried to figure it out but got no ideal how to write the code.
Could someone please give me some hints or example code please?

package main

import (
//"context"
"fmt"
v3 "github.com/coreos/etcd/clientv3"
conc "github.com/coreos/etcd/clientv3/concurrency"
"log"
)

const (
KEY = "/KEY/T"
)

func main() {
cli, err := v3.New(v3.Config{Endpoints: []string{"http://192.168.6.102:2379"}})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
fmt.Println("create v3 client done")

/*
if _, err = cli.Put(context.TODO(), KEY, "100"); err != nil {
log.Fatal(err)
}
*/

addFunc := func(stm conc.STM) error {
value_string := stm.Get(KEY)
var value int64 = 0
fmt.Sscanf(value_string, "%d", &value)
value = value + 1
stm.Put(KEY, fmt.Sprintf("%d", value))
return nil
}
for i := 0; i < 500; i++ {
if _, err = conc.NewSTM(cli, addFunc); err != nil {
log.Fatal(err)
}
}
fmt.Println("atomic add done")
}

here is my example code through introspecting into the etcd manual for a long time. I hope that would be help to those who come here laterly 
 

anthony...@coreos.com

unread,
Sep 8, 2017, 12:44:09 PM9/8/17
to etcd-dev
If the intent is to only store the counter in the value, STM is more expensive than necessary. STM costs at least one extra round trip, and more if there are read conflicts.

Using the key version is more efficient and faster:

Increment the counter 500 times atomically:

var wg sync.WaitGroup
wg.Add(100)

for i := 0; i < 500; i++ {
    go func() {
        defer wg.Done()
        cli.Put(context.TODO(), "key", "")
    }()
}
wg.Wait()

Read the counter:

func getCounter(cli *clientv3.Client) int64 {
    resp, err := cli.Get(context.TODO(), "key")
    if err != nil { panic(err) }
    if len(resp.Kvs) == 0 { return 0 }
    return resp.Kvs[0].Version
Reply all
Reply to author
Forward
0 new messages