feat: bulk insert

This commit is contained in:
yzqzss 2024-06-17 00:56:44 +08:00
parent 72ee93802d
commit 625250bf12
2 changed files with 124 additions and 0 deletions

123
api_v1.go
View File

@ -2,6 +2,8 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
@ -194,6 +196,127 @@ func v1_update_task(c *gin.Context) {
})
}
type Item struct {
Item_id string `json:"item_id"`
Item_id_type string `json:"item_id_type"`
Item_status string `json:"item_status"`
Item_status_type string `json:"item_status_type"`
Payload string `json:"payload"`
}
func v1_insert_many(c *gin.Context) {
identifier := c.Param("identifier")
client_version := c.Param("client_version")
archivist := c.Param("archivist")
top_items_rawString := c.PostForm("items")
if is_safe_sting(identifier) && is_safe_sting(archivist) {
// OK
} else {
c.JSON(400, gin.H{"error": "Invalid parameter or query string"})
return
}
project := GetProject(identifier)
if project == nil {
c.JSON(404, gin.H{
"error": fmt.Sprintf("Project %s not found", identifier),
})
return
}
if client_version != project.Client.Version {
c.JSON(400, gin.H{
"error": "Client version not supported",
"msg": fmt.Sprintf("Please update to version %s", project.Client.Version),
})
return
}
db := mongoClient.Database(project.Mongodb.DbName)
item_collection := db.Collection(project.Mongodb.ItemCollection)
topItems := []Item{}
err := json.Unmarshal([]byte(top_items_rawString), &topItems)
if err != nil {
c.JSON(400, gin.H{"error": "Invalid JSON items"})
panic(err)
}
var doc_id_name string
if project.Mongodb.CustomDocIDName != "" {
doc_id_name = project.Mongodb.CustomDocIDName
} else {
doc_id_name = DEFAULT_DOC_ID_NAME
}
documents := []interface{}{}
for _, item := range topItems {
document := bson.M{}
// id
if item.Item_id_type == "str" {
document[doc_id_name] = item.Item_id
} else if item.Item_id_type == "int" {
item_id_int, err := strconv.ParseInt(item.Item_id, 10, 64)
if err != nil {
c.JSON(400, gin.H{"error": "Invalid item_id"})
return
}
document[doc_id_name] = item_id_int
} else {
c.JSON(400, gin.H{"error": "Invalid task_id_type"})
return
}
// status
if item.Item_status_type == "str" {
document["status"] = item.Item_status
} else if item.Item_status_type == "int" {
status, err := strconv.ParseInt(item.Item_status, 10, 64)
if err != nil {
c.JSON(400, gin.H{"error": "Invalid item_status"})
return
}
document["status"] = status
} else if item.Item_status_type == "None" {
document["status"] = nil
} else {
c.JSON(400, gin.H{"error": "Invalid status_type"})
return
}
// payload
var payload_BSON primitive.M
err := bson.UnmarshalExtJSON([]byte(item.Payload), true, &payload_BSON)
if err != nil {
c.JSON(400, gin.H{"error": "Invalid JSON payload"})
panic(err)
}
document["payload"] = payload_BSON
documents = append(documents, document)
}
// do insert, sorted=false
opt := options.InsertMany().SetOrdered(false)
result, err := item_collection.InsertMany(context.TODO(), documents, opt)
// if err is BulkWriteException
if err != nil {
if !errors.As(err, &mongo.BulkWriteException{}) {
c.JSON(500, gin.H{"error": "Failed to insert items"})
panic(err)
}
// BulkWriteException is expected
}
bulkWriteException, _ := err.(mongo.BulkWriteException)
c.JSON(200, gin.H{
"InsertedIDs": result.InsertedIDs,
"msg": "Items bulk insert actions done successfully",
"WriteErrors": len(bulkWriteException.WriteErrors),
"Labels": len(bulkWriteException.Labels),
"WriteConcernError": bulkWriteException.WriteConcernError,
})
}
func v1_insert_item(c *gin.Context) {
identifier := c.Param("identifier")
client_version := c.Param("client_version")

View File

@ -56,6 +56,7 @@ func main() {
v1_tracker.POST("/project/:identifier/:client_version/:archivist/claim_task", v1_claim_task)
v1_tracker.POST("/project/:identifier/:client_version/:archivist/update_task/:task_id", v1_update_task)
v1_tracker.POST("/project/:identifier/:client_version/:archivist/insert_item/:item_id", v1_insert_item)
v1_tracker.POST("/project/:identifier/:client_version/:archivist/insert_many/:size", v1_insert_many) // :size unused
}
r.Run()
}