Streaming (chunked HTTP) to fasthttp.RequestCtx

1,365 views
Skip to first unread message

Miki Tebeka

unread,
Aug 5, 2018, 4:06:59 AM8/5/18
to golang-nuts
Hi,

Since I didn't get an answer on SO, though I'll try here :)

I'd like to let client do streaming API and using fasthttp as the backend.
I don't want to expose the `fasthttp.RequestCtx` to the client and don't want to get a channel from the client function.

Here's the code I have (the client function is `worker`), when I run it and then do `curl -iv http://localhost:8080` it gets stuck. On the server side I see 5 `WRITE: ...` prints. Debugging it seems like the server hangs in `s.writer.Flush()`

Any ideas how to implement this?

package main


import (
   
"bufio"
   
"fmt"
   
"log"


   
"github.com/valyala/fasthttp"
)


var (
    poem
= []string{
       
"The Road goes ever on and on",
       
"Down from the door where it began.",
       
"Now far ahead the Road has gone,",
       
"And I must follow, if I can,",
       
"Pursuing it with eager feet,",
       
"Until it joins some larger way",
       
"Where many paths and errands meet.",
       
"And whither then? I cannot say.",
   
}
)


// Streamer is streaming API
type
Streamer interface {
   
// Write writes bytes to streamer
   
Write(p []byte) (n int, err error)
   
// Flush flushes data to the client
   
Flush() error
   
// SetStatusCode sets the status code. *Must* be called before Write and Flush
   
SetStatusCode(statusCode int) error
   
// SetHeader sets a response header. *Must* be called before Write and Flush
   
SetHeader(key string, value interface{}) error
}


// HTTPStreamer implement Streamer with fasthttp
type
HTTPStreamer struct {
    ctx    
*fasthttp.RequestCtx
   
done   chan bool
    writer
*bufio.Writer
}


// NewHTTPStreamer returns a new HTTPStreamer
func
NewHTTPStreamer(ctx *fasthttp.RequestCtx) *HTTPStreamer {
   
return &HTTPStreamer{
        ctx
: ctx,
   
}
}


// Close closes the stream
func
(s *HTTPStreamer) Close() {
   
if s.done != nil {
        close
(s.done)
   
}
}


// Write writes bytes to streamer
func
(s *HTTPStreamer) Write(p []byte) (n int, err error) {
   
if s.writer == nil {
        s
.done = make(chan bool)
        ready
:= make(chan bool)
        go func
() {
            s
.ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
                s
.writer = w
                ready
<- true // Signal that writer is set
               
<-s.done      // Wait for stream to be closed
           
})
       
}()


       
<-ready // Wait until writer is set
   
}


    fmt
.Printf("WRITE: %s", string(p))
   
return s.writer.Write(p)
}


// Flush flushes data to the client
func
(s *HTTPStreamer) Flush() error {
   
if s.writer != nil {
       
return s.writer.Flush()
   
}


   
return nil
}


// SetStatusCode sets the status code. *Must* be called before Write and Flush
func
(s *HTTPStreamer) SetStatusCode(statusCode int) error {
   
if s.writer != nil {
       
return fmt.Errorf("Streaming started - can't set status")
   
}


    s
.ctx.SetStatusCode(statusCode)
   
return nil
}


// SetHeader sets a response header. *Must* be called before Write and Flush
// value can be string or []byte
func
(s *HTTPStreamer) SetHeader(key string, value interface{}) error {
   
if s.writer != nil {
       
return fmt.Errorf("Streaming started - can't set header")
   
}


   
switch v := value.(type) {
   
case string:
        s
.ctx.Response.Header.Set(key, v)
   
case []byte:
        s
.ctx.Response.Header.SetBytesV(key, v)
   
default:
       
return fmt.Errorf("Unsupported header value type - %T", value)
   
}


   
return nil
}


func worker
(s Streamer) {
    s
.SetStatusCode(201)
    s
.SetHeader("X-T", "VALUE")
   
for _, line := range poem {
        s
.Write([]byte(line + "\n"))
        s
.Flush()
   
}
}


func handler
(ctx *fasthttp.RequestCtx) {
    s
:= NewHTTPStreamer(ctx)
    worker
(s)
    s
.Close()
}


func main
() {
    srv
:= &fasthttp.Server{
       
Handler: handler,
   
}


   
if err := srv.ListenAndServe(":8080"); err != nil {
        log
.Fatal(err)
   
}
}



Reply all
Reply to author
Forward
0 new messages