cmd/pk-devimport: command that runs a local importer

0 views
Skip to first unread message

nor...@camlistore.org

unread,
Dec 20, 2017, 3:04:57 PM12/20/17
to camlistor...@googlegroups.com


https://camlistore.googlesource.com/camlistore/+/99e71732b9c5f9883ddd72f08eeee48cd0d67d86

commit 99e71732b9c5f9883ddd72f08eeee48cd0d67d86
Author: mpl <mathieu....@gmail.com>
Date: Mon Dec 18 18:49:01 2017 +0100

cmd/pk-devimport: command that runs a local importer

Change-Id: I62e35db0040cda51f2bc5ede4560974e3f3442cc

diff --git a/cmd/pk-devimport/devimport.go b/cmd/pk-devimport/devimport.go
new file mode 100644
index 0000000..f05ccfb
--- /dev/null
+++ b/cmd/pk-devimport/devimport.go
@@ -0,0 +1,177 @@
+/*
+Copyright 2017 The Camlistore Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// The pk-devimport command runs an importer, using the importer code linked into the binary,
+// against a Perkeep server. This enables easier interactive development of importers,
+// without having to restart a server.
+package main
+
+import (
+ "errors"
+ "flag"
+ "fmt"
+ "log"
+ "net/url"
+ "os"
+
+ "camlistore.org/pkg/blob"
+ "camlistore.org/pkg/client"
+ "camlistore.org/pkg/importer"
+ "camlistore.org/pkg/osutil"
+ "camlistore.org/pkg/search"
+
+ _ "camlistore.org/pkg/importer/allimporters"
+)
+
+const serverFlagHelp = "Format is is either a URL prefix (with optional path), a host[:port], a config file server alias, or blank to use the Camlistore client config's default server."
+
+// newClient returns a Perkeep client for the server.
+// The server may be:
+// * blank, to use the default in the config file
+// * an alias, to use that named alias in the config file
+// * host:port
+// * https?://host[:port][/path]
+func newClient(server string, opts ...client.ClientOption) *client.Client {
+ if server == "" {
+ return client.NewOrFail(opts...)
+ }
+ cl := client.New(server, opts...)
+ if err := cl.SetupAuth(); err != nil {
+ log.Fatalf("Could not setup auth for connecting to %v: %v", server, err)
+ }
+ return cl
+}
+
+var (
+ flagServer = flag.String("server", "", "Server to search. "+serverFlagHelp)
+)
+
+func usage() {
+ fmt.Fprintf(os.Stderr,
+ `Usage: pk-devimport <importerType> <accountNodeRef>
+
+-importerType is one of the supported importers: feed, flickr, foursquare, gphotos, pinboard, plaid, twitter.
+-accountNodeRef is the permanode of camliNodeType importerAccount representing the account to import.
+`)
+}
+
+func newImporterHost(server string, importerType string) (*importer.Host, error) {
+ cl := newClient(server)
+ // To avoid the ExplicitSecretRingFile panic when setting up the signer.
+ osutil.AddSecretRingFlag()
+ signer, err := cl.Signer()
+ if err != nil {
+ return nil, err
+ }
+ // TODO(mpl): technically not true, but works for now.
+ baseURL, err := cl.BlobRoot()
+ if err != nil {
+ return nil, err
+ }
+
+ clientID, clientSecret, err := getCredentials(cl, importerType)
+ if err != nil {
+ return nil, err
+ }
+
+ hc := importer.HostConfig{
+ BaseURL: baseURL,
+ Prefix: "/importer/", // TODO(mpl): do not hardcode this prefix
+ Target: cl,
+ BlobSource: cl,
+ Signer: signer,
+ Search: cl,
+ ClientId: map[string]string{
+ importerType: clientID,
+ },
+ ClientSecret: map[string]string{
+ importerType: clientSecret,
+ },
+ }
+
+ return importer.NewHost(hc)
+}
+
+// getCredentials returns the OAuth clientID and clientSecret found in the
+// importer node of the given importerType.
+func getCredentials(sh search.QueryDescriber, importerType string) (string, string, error) {
+ var clientID, clientSecret string
+ res, err := sh.Query(&search.SearchQuery{
+ Expression: "attr:camliNodeType:importer and attr:importerType:" + importerType,
+ Describe: &search.DescribeRequest{
+ Depth: 1,
+ },
+ })
+ if err != nil {
+ return clientID, clientSecret, err
+ }
+ if res.Describe == nil {
+ return clientID, clientSecret, errors.New("no importer node found")
+ }
+ var attrs url.Values
+ for _, resBlob := range res.Blobs {
+ blob := resBlob.Blob
+ desBlob, ok := res.Describe.Meta[blob.String()]
+ if !ok || desBlob.Permanode == nil {
+ continue
+ }
+ attrs = desBlob.Permanode.Attr
+ if attrs.Get("camliNodeType") != "importer" {
+ return clientID, clientSecret, errors.New("search result returned non importer node")
+ }
+ if t := attrs.Get("importerType"); t != importerType {
+ return clientID, clientSecret, fmt.Errorf("search result returned importer node of the wrong type: %v", t)
+ }
+ break
+ }
+ attrClientID, attrClientSecret := "authClientID", "authClientSecret"
+ attr := attrs[attrClientID]
+ if len(attr) != 1 {
+ return clientID, clientSecret, fmt.Errorf("no %v attribute", attrClientID)
+ }
+ clientID = attr[0]
+ attr = attrs[attrClientSecret]
+ if len(attr) != 1 {
+ return clientID, clientSecret, fmt.Errorf("no %v attribute", attrClientSecret)
+ }
+ clientSecret = attr[0]
+ return clientID, clientSecret, nil
+}
+
+func main() {
+ flag.Parse()
+ args := flag.Args()
+ if len(args) != 2 {
+ usage()
+ os.Exit(2)
+ }
+
+ if _, ok := importer.All()[args[0]]; !ok {
+ log.Fatalf("%v is not a valid importer name", args[0])
+ }
+ ref, ok := blob.Parse(args[1])
+ if !ok {
+ log.Fatalf("Not a valid blob ref: %q", args[1])
+ }
+
+ h, err := newImporterHost(*flagServer, args[0])
+ if err != nil {
+ log.Fatal(err)
+ }
+ if err := h.RunImporterAccount(args[0], ref); err != nil {
+ log.Fatal(err)
+ }
+}
diff --git a/pkg/importer/importer.go b/pkg/importer/importer.go
index 8288d1e..ba3316a 100644
--- a/pkg/importer/importer.go
+++ b/pkg/importer/importer.go
@@ -425,6 +425,27 @@ func (h *Host) AccountsStatus() (interface{}, []camtypes.StatusError) {
return s, errs
}

+// RunImporterAccount runs the importerType importer on the account described in
+// accountNode.
+func (h *Host) RunImporterAccount(importerType string, accountNode blob.Ref) error {
+ h.didInit.Wait()
+ imp, ok := h.imp[importerType]
+ if !ok {
+ return fmt.Errorf("no %q importer for this account", importerType)
+ }
+ accounts, err := imp.Accounts()
+ if err != nil {
+ return err
+ }
+ for _, ia := range accounts {
+ if ia.acct.pn != accountNode {
+ continue
+ }
+ return ia.run()
+ }
+ return fmt.Errorf("no %v account matching account in node %v", importerType, accountNode)
+}
+
func (h *Host) InitHandler(hl blobserver.FindHandlerByTyper) error {
if prefix, _, err := hl.FindHandlerByType("ui"); err == nil {
h.uiPrefix = prefix
@@ -1150,6 +1171,33 @@ func (ia *importerAcct) start() {
}()
}

+// TODO(mpl): review this code more carefully. mostly copied from start().
+func (ia *importerAcct) run() error {
+ ia.mu.Lock()
+ defer ia.mu.Unlock()
+ if ia.current != nil {
+ return errors.New("whatever")
+ }
+ rc := &RunContext{
+ Host: ia.im.host,
+ ia: ia,
+ }
+ rc.ctx, rc.cancel = context.WithCancel(context.WithValue(context.Background(), ctxutil.HTTPClient, ia.im.host.HTTPClient()))
+ ia.current = rc
+ ia.stopped = false
+ ia.lastRunStart = time.Now()
+ log.Printf("Starting %v: %s", ia, ia.AccountLinkSummary())
+ err := ia.im.impl.Run(rc)
+ if err != nil {
+ return err
+ }
+ log.Printf("%v finished.", ia)
+ ia.current = nil
+ ia.stopped = false
+ ia.lastRunDone = time.Now()
+ return ia.lastRunErr
+}
+
func (ia *importerAcct) stop() {
ia.mu.Lock()
defer ia.mu.Unlock()
Reply all
Reply to author
Forward
0 new messages