diff options
-rw-r--r-- | system/api/analytics/batch.go | 50 | ||||
-rw-r--r-- | system/api/analytics/init.go | 28 |
2 files changed, 61 insertions, 17 deletions
diff --git a/system/api/analytics/batch.go b/system/api/analytics/batch.go index 1fee247..1b35339 100644 --- a/system/api/analytics/batch.go +++ b/system/api/analytics/batch.go @@ -3,20 +3,28 @@ package analytics import ( "encoding/json" "strconv" + "time" "github.com/boltdb/bolt" ) // batchInsert is effectively a specialized version of SetContentMulti from the // db package, iterating over a []apiRequest instead of []url.Values -func batchInsert(batch []apiRequest) error { +func batchInsert(requests chan apiRequest) error { + var reqs []apiRequest + batchSize := len(requestChan) + + for i := 0; i < batchSize; i++ { + reqs = append(reqs, <-requestChan) + } + err := store.Update(func(tx *bolt.Tx) error { b, err := tx.CreateBucketIfNotExists([]byte("requests")) if err != nil { return err } - for _, apiReq := range batch { + for _, apiReq := range reqs { // get the next available ID and convert to string // also set effectedID to int of ID id, err := b.NextSequence() @@ -45,3 +53,41 @@ func batchInsert(batch []apiRequest) error { return nil } + +// batchPrune takes a duration to evaluate apiRequest dates against. If any of +// the apiRequest timestamps are before the threshold, they are removed. +// TODO: add feature to alternatively backup old analytics to cloud +func batchPrune(threshold time.Duration) error { + now := time.Now() + today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + max := today.Add(threshold) + + // iterate through all request data + err := store.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("requests")) + + b.ForEach(func(k, v []byte) error { + var r apiRequest + err := json.Unmarshal(v, &r) + if err != nil { + return err + } + + // delete if timestamp is below or equal to max + ts := time.Unix(r.Timestamp, 0) + if ts.Equal(max) || ts.Before(max) { + err := b.Delete(k) + if err != nil { + return err + } + } + + return nil + }) + }) + if err != nil { + return err + } + + return nil +} diff --git a/system/api/analytics/init.go b/system/api/analytics/init.go index fd65a79..722cf0e 100644 --- a/system/api/analytics/init.go +++ b/system/api/analytics/init.go @@ -27,8 +27,8 @@ type apiRequest struct { } var ( - store *bolt.DB - recordChan chan apiRequest + store *bolt.DB + requestChan chan apiRequest ) // Record queues an apiRequest for metrics @@ -45,8 +45,8 @@ func Record(req *http.Request) { External: external, } - // put r on buffered recordChan to take advantage of batch insertion in DB - recordChan <- r + // put r on buffered requestChan to take advantage of batch insertion in DB + requestChan <- r } // Close exports the abillity to close our db file. Should be called with defer @@ -67,7 +67,7 @@ func Init() { log.Fatalln(err) } - recordChan = make(chan apiRequest, 1024*64*runtime.NumCPU()) + requestChan = make(chan apiRequest, 1024*64*runtime.NumCPU()) go serve() @@ -77,31 +77,29 @@ func Init() { } func serve() { - // make timer to notify select to batch request insert from recordChan + // make timer to notify select to batch request insert from requestChan // interval: 30 seconds apiRequestTimer := time.NewTicker(time.Second * 30) // make timer to notify select to remove old analytics // interval: 2 weeks // TODO: enable analytics backup service to cloud - pruneDBTimer := time.NewTicker(time.Hour * 24 * 14) + pruneThreshold := time.Hour * 24 * 14 + pruneDBTimer := time.NewTicker(pruneThreshold) for { select { case <-apiRequestTimer.C: - var reqs []apiRequest - batchSize := len(recordChan) - - for i := 0; i < batchSize; i++ { - reqs = append(reqs, <-recordChan) - } - - err := batchInsert(reqs) + err := batchInsert(requestChan) if err != nil { log.Println(err) } case <-pruneDBTimer.C: + err := batchPrune(pruneThreshold) + if err != nil { + log.Println(err) + } case <-time.After(time.Second * 30): continue |