I tried writing this http post request which streams the request body line-by-line as JSON.
(This is for an ElasticSearch bulk api request, newline separate JSON in the request body). I didn't think the waitgroup was necessary, but apparently the waitgroup is necessary. Does anyone know why? I assume the need for the waitgroup might represent a bug in my code.
func doBulkUpsertWithResponse(baseUrl string, z []UpsertDoc) (*map[string]interface{}, error) {
var m *map[string]interface{}
if len(z) < 1 {
return m, nil
}
rd, wr := io.Pipe()
// we stream json lines to post request
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
// close the stream after writing is finished wg.Done()
wr.Close();
}()
for _, v := range z {
var doc = ElasticBulkUpsertable{
&v.Doc, true,
}
jsonStr, err := json.Marshal(&doc)
if err != nil {
continue
}
updateLine := fmt.Sprintf(`{"update":{"_id":"%s"}}`, v.Id) + "\n"
if _, err := wr.Write([]byte(updateLine)); err != nil {
continue
}
docLine := string(jsonStr) + "\n"
if _, err := wr.Write([]byte(docLine)); err != nil {
continue
}
}
}()
fullUrl := fmt.Sprintf("%s/%s", baseUrl, "_bulk")
req, err := http.NewRequest("POST", fullUrl, rd)
if err != nil {
log.Println(fmt.Errorf("cm:3ec045d3-4790-4d7a-aab8-65949933d263: '%v'", err))
return nil, err
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
wg.Wait()
if err != nil {
return m, err
}
defer func() {
resp.Body.Close()
}()
if resp.StatusCode != 200 {
return m, errors.New("request failed with status code: " + resp.Status)
}
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, err
}
return m, nil
}