summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--system/api/analytics/batch.go50
-rw-r--r--system/api/analytics/init.go28
2 files changed, 61 insertions, 17 deletions
diff --git a/system/api/analytics/batch.go b/system/api/analytics/batch.go
index 1fee247..1b35339 100644
--- a/system/api/analytics/batch.go
+++ b/system/api/analytics/batch.go
@@ -3,20 +3,28 @@ package analytics
import (
"encoding/json"
"strconv"
+ "time"
"github.com/boltdb/bolt"
)
// batchInsert is effectively a specialized version of SetContentMulti from the
// db package, iterating over a []apiRequest instead of []url.Values
-func batchInsert(batch []apiRequest) error {
+func batchInsert(requests chan apiRequest) error {
+ var reqs []apiRequest
+ batchSize := len(requestChan)
+
+ for i := 0; i < batchSize; i++ {
+ reqs = append(reqs, <-requestChan)
+ }
+
err := store.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("requests"))
if err != nil {
return err
}
- for _, apiReq := range batch {
+ for _, apiReq := range reqs {
// get the next available ID and convert to string
// also set effectedID to int of ID
id, err := b.NextSequence()
@@ -45,3 +53,41 @@ func batchInsert(batch []apiRequest) error {
return nil
}
+
+// batchPrune takes a duration to evaluate apiRequest dates against. If any of
+// the apiRequest timestamps are before the threshold, they are removed.
+// TODO: add feature to alternatively backup old analytics to cloud
+func batchPrune(threshold time.Duration) error {
+ now := time.Now()
+ today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
+ max := today.Add(threshold)
+
+ // iterate through all request data
+ err := store.Update(func(tx *bolt.Tx) error {
+ b := tx.Bucket([]byte("requests"))
+
+ b.ForEach(func(k, v []byte) error {
+ var r apiRequest
+ err := json.Unmarshal(v, &r)
+ if err != nil {
+ return err
+ }
+
+ // delete if timestamp is below or equal to max
+ ts := time.Unix(r.Timestamp, 0)
+ if ts.Equal(max) || ts.Before(max) {
+ err := b.Delete(k)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+ })
+ })
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/system/api/analytics/init.go b/system/api/analytics/init.go
index fd65a79..722cf0e 100644
--- a/system/api/analytics/init.go
+++ b/system/api/analytics/init.go
@@ -27,8 +27,8 @@ type apiRequest struct {
}
var (
- store *bolt.DB
- recordChan chan apiRequest
+ store *bolt.DB
+ requestChan chan apiRequest
)
// Record queues an apiRequest for metrics
@@ -45,8 +45,8 @@ func Record(req *http.Request) {
External: external,
}
- // put r on buffered recordChan to take advantage of batch insertion in DB
- recordChan <- r
+ // put r on buffered requestChan to take advantage of batch insertion in DB
+ requestChan <- r
}
// Close exports the abillity to close our db file. Should be called with defer
@@ -67,7 +67,7 @@ func Init() {
log.Fatalln(err)
}
- recordChan = make(chan apiRequest, 1024*64*runtime.NumCPU())
+ requestChan = make(chan apiRequest, 1024*64*runtime.NumCPU())
go serve()
@@ -77,31 +77,29 @@ func Init() {
}
func serve() {
- // make timer to notify select to batch request insert from recordChan
+ // make timer to notify select to batch request insert from requestChan
// interval: 30 seconds
apiRequestTimer := time.NewTicker(time.Second * 30)
// make timer to notify select to remove old analytics
// interval: 2 weeks
// TODO: enable analytics backup service to cloud
- pruneDBTimer := time.NewTicker(time.Hour * 24 * 14)
+ pruneThreshold := time.Hour * 24 * 14
+ pruneDBTimer := time.NewTicker(pruneThreshold)
for {
select {
case <-apiRequestTimer.C:
- var reqs []apiRequest
- batchSize := len(recordChan)
-
- for i := 0; i < batchSize; i++ {
- reqs = append(reqs, <-recordChan)
- }
-
- err := batchInsert(reqs)
+ err := batchInsert(requestChan)
if err != nil {
log.Println(err)
}
case <-pruneDBTimer.C:
+ err := batchPrune(pruneThreshold)
+ if err != nil {
+ log.Println(err)
+ }
case <-time.After(time.Second * 30):
continue