package main
import "github.com/gocql/gocql"
import (
"time"
"sync"
"fmt"
"log"
"sync/atomic"
)
const max = 3333
const jump = 40
const source = 33
func BenchmarkInsert(exec func(x int) error) {
var wg sync.WaitGroup
t := time.Now()
wg.Add(source)
for y := 0; y < source; y++ {
go func(y int) {
subops := 0
t2 := time.Now()
defer wg.Done()
for x := 1 + max * y / source; x <= max * (y + 1) / source; x++ {
subops++
if err := exec(x); err != nil {
log.Fatal(err)
return
}
}
dur2 := time.Now().Sub(t2)
fmt.Printf("I-%02d: (%.2f ms/op: %d)\n", y, float64(dur2.Nanoseconds()) / 1000000 / float64(subops), subops)
}(y)
}
wg.Wait()
dur := time.Now().Sub(t)
fmt.Printf("INSERT: %v (%.2f ms/op)\n", dur, float64(dur.Nanoseconds()) / 1000000 / max)
}
func BenchmarkUpdate(exec func(x int) error) {
var wg sync.WaitGroup
t := time.Now()
wg.Add(source)
for y := 0; y < source; y++ {
go func(y int) {
subops := 0
t2 := time.Now()
defer wg.Done()
for x := 1 + max / source * y; x <= max / source * (y + 1); x++ {
subops++
if err := exec(x); err != nil {
log.Fatal(err)
return
}
}
dur2 := time.Now().Sub(t2)
fmt.Printf("U-%02d: (%.2f ms/op: %d)\n", y, float64(dur2.Nanoseconds()) / 1000000 / float64(subops), subops)
}(y)
}
wg.Wait()
dur := time.Now().Sub(t)
fmt.Printf("UPDATE: %v (%.2f ms/op)\n", dur, float64(dur.Nanoseconds()) / 1000000 / max)
}
func BenchmarkSelect(exec1, exec2 func(x, lim int) error) {
var wg sync.WaitGroup
t := time.Now()
ops := int64(0)
for y := 2; y < jump; y++ {
wg.Add(1)
go func(y int) {
defer wg.Done()
subops := 0
t2 := time.Now()
for x := max - 1; x > 0; x -= y {
atomic.AddInt64(&ops, 1)
subops++
if err := exec1(x,y*y); err != nil {
log.Fatal(err)
return
}
}
dur2 := time.Now().Sub(t2)
fmt.Printf("S-%02d: (%.2f ms/op: %d)\n", y, float64(dur2.Nanoseconds()) / 1000000 / float64(subops), subops)
}(y)
wg.Add(1)
go func(y int) {
defer wg.Done()
subops := 0
t2 := time.Now()
for x := 1; x < max; x += y {
atomic.AddInt64(&ops, 1)
subops++
if err := exec2(x,y*y); err != nil {
log.Fatal(err)
return
}
}
dur2 := time.Now().Sub(t2)
fmt.Printf("S-%02d: (%.2f ms/op: %d)\n", y + jump, float64(dur2.Nanoseconds()) / 1000000 / float64(subops), subops)
}(y)
}
wg.Wait()
dur := time.Now().Sub(t)
fmt.Printf("SELECT: %v (%.2f ms/op: %d)\n", dur, float64(dur.Nanoseconds()) / 1000000 / float64(ops), ops)
}
func main() {
clust := gocql.NewCluster(`127.0.0.1`)
clust.Timeout = 8 * time.Second
clust.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries:3}
clust.Keyspace = `test4`
db, err := clust.CreateSession()
defer db.Close()
if err != nil {
log.Fatal(err)
return
}
fmt.Println(`test4: scylladb`)
if err := db.Query(`DROP TABLE test4`).Exec(); err != nil {
log.Println(err)
}
if err := db.Query(`CREATE TABLE test4 (bucket text, k text, v TEXT, PRIMARY KEY(bucket,k))`).Exec(); err != nil {
log.Fatal(err)
return
}
BenchmarkInsert(func(x int) error {
err = db.Query(fmt.Sprintf(`INSERT INTO test4(bucket,k,v)VALUES('foo','%05d','%05d')`, x, x)).Exec()
return err
})
BenchmarkUpdate(func(x int) error {
err = db.Query(fmt.Sprintf(`UPDATE test4 SET v = '%06d' WHERE bucket = 'foo' AND k = '%05d'`, x, x)).Exec()
return err
})
BenchmarkSelect(func(x, lim int) error {
sql := fmt.Sprintf(`SELECT k, v FROM test4 WHERE bucket = 'foo' AND k >= '%05d' ORDER BY k ASC LIMIT %d`, x, lim)
iter := db.Query(sql).Iter()
defer iter.Close()
tot := 0
for {
m := map[string]interface{}{}
if !iter.MapScan(m) {
break
}
tot++
}
if tot == 0 {
return fmt.Errorf(`Empty result: `+sql)
}
return nil
}, func(x, lim int) error {
sql := fmt.Sprintf(`SELECT k, v FROM test4 WHERE bucket = 'foo' AND k <= '%05d' ORDER BY k DESC LIMIT %d`, x, lim)
iter := db.Query(sql).Iter()
defer iter.Close()
tot := 0
for {
m := map[string]interface{}{}
if !iter.MapScan(m) {
break
}
tot++
}
if tot == 0 {
return fmt.Errorf(`Empty result: `+sql)
}
return nil
})
}
Sometimes it runs normally, but sometimes it gives something like this:
2017/05/19 14:16:28 gocql: unable to create session: unable to discover protocol version: dial tcp
127.0.0.1:9042: i/o timeout
2017/05/19 14:16:33 gocql: unable to create session: unable to discover protocol version: dial tcp
127.0.0.1:9042: i/o timeout
2017/05/19 14:20:30 gocql: unable to create session: unable to discover protocol version: dial tcp
127.0.0.1:9042: i/o timeout
2017/05/19 14:18:28 gocql: unable to create session: unable to discover protocol version: dial tcp
127.0.0.1:9042: getsockopt: connection refused
2017/05/19 14:18:44 gocql: unable to create session: control: unable to connect to initial hosts: dial tcp
127.0.0.1:9042: i/o timeout
2017/05/19 14:20:34 gocql: unable to create session: unable to discover protocol version: dial tcp
127.0.0.1:9042: i/o timeout
2017/05/19 14:20:37 gocql: unable to create session: unable to discover protocol version: dial tcp
127.0.0.1:9042: i/o timeout
2017/05/19 14:27:46 gocql: unable to create session: unable to discover protocol version: dial tcp
127.0.0.1:9042: i/o timeout