Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async clean log #5434

Merged
merged 3 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion service/aiproxy/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ var (

var (
disableServe atomic.Bool
logStorageHours int64 = 0 // default 0 means no limit
saveAllLogDetail atomic.Bool
logDetailRequestBodyMaxSize int64 = 128 * 1024 // 128KB
logDetailResponseBodyMaxSize int64 = 128 * 1024 // 128KB
logDetailStorageHours int64 = 3 * 24
logDetailStorageHours int64 = 3 * 24 // 3 days
internalToken atomic.Value
)

Expand Down Expand Up @@ -99,6 +100,15 @@ func SetTimeoutWithModelType(timeout map[int]int64) {
timeoutWithModelType.Store(timeout)
}

func GetLogStorageHours() int64 {
return atomic.LoadInt64(&logStorageHours)
}

func SetLogStorageHours(hours int64) {
hours = env.Int64("LOG_STORAGE_HOURS", hours)
atomic.StoreInt64(&logStorageHours, hours)
}

func GetLogDetailStorageHours() int64 {
return atomic.LoadInt64(&logDetailStorageHours)
}
Expand Down
4 changes: 2 additions & 2 deletions service/aiproxy/controller/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func GetChannels(c *gin.Context) {
channelType, _ := strconv.Atoi(c.Query("channel_type"))
baseURL := c.Query("base_url")
order := c.Query("order")
channels, total, err := model.GetChannels(page*perPage, perPage, id, name, key, channelType, baseURL, order)
channels, total, err := model.GetChannels(page, perPage, id, name, key, channelType, baseURL, order)
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand Down Expand Up @@ -86,7 +86,7 @@ func SearchChannels(c *gin.Context) {
channelType, _ := strconv.Atoi(c.Query("channel_type"))
baseURL := c.Query("base_url")
order := c.Query("order")
channels, total, err := model.SearchChannels(keyword, page*perPage, perPage, id, name, key, channelType, baseURL, order)
channels, total, err := model.SearchChannels(keyword, page, perPage, id, name, key, channelType, baseURL, order)
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand Down
4 changes: 2 additions & 2 deletions service/aiproxy/controller/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (g *GroupResponse) MarshalJSON() ([]byte, error) {
func GetGroups(c *gin.Context) {
page, perPage := parsePageParams(c)
order := c.DefaultQuery("order", "")
groups, total, err := model.GetGroups(page*perPage, perPage, order, false)
groups, total, err := model.GetGroups(page, perPage, order, false)
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand All @@ -56,7 +56,7 @@ func SearchGroups(c *gin.Context) {
page, perPage := parsePageParams(c)
order := c.DefaultQuery("order", "")
status, _ := strconv.Atoi(c.Query("status"))
groups, total, err := model.SearchGroup(keyword, page*perPage, perPage, order, status)
groups, total, err := model.SearchGroup(keyword, page, perPage, order, status)
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand Down
4 changes: 2 additions & 2 deletions service/aiproxy/controller/modelconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func GetModelConfigs(c *gin.Context) {
page, perPage := parsePageParams(c)
_model := c.Query("model")
configs, total, err := model.GetModelConfigs(page*perPage, perPage, _model)
configs, total, err := model.GetModelConfigs(page, perPage, _model)
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand Down Expand Up @@ -55,7 +55,7 @@ func SearchModelConfigs(c *gin.Context) {
page, perPage := parsePageParams(c)
_model := c.Query("model")
owner := c.Query("owner")
configs, total, err := model.SearchModelConfigs(keyword, page*perPage, perPage, _model, model.ModelOwner(owner))
configs, total, err := model.SearchModelConfigs(keyword, page, perPage, _model, model.ModelOwner(owner))
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand Down
8 changes: 4 additions & 4 deletions service/aiproxy/controller/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func GetTokens(c *gin.Context) {
order := c.Query("order")
status, _ := strconv.Atoi(c.Query("status"))

tokens, total, err := model.GetTokens(group, page*perPage, perPage, order, status)
tokens, total, err := model.GetTokens(group, page, perPage, order, status)
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand All @@ -134,7 +134,7 @@ func GetGroupTokens(c *gin.Context) {
order := c.Query("order")
status, _ := strconv.Atoi(c.Query("status"))

tokens, total, err := model.GetTokens(group, page*perPage, perPage, order, status)
tokens, total, err := model.GetTokens(group, page, perPage, order, status)
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand All @@ -155,7 +155,7 @@ func SearchTokens(c *gin.Context) {
status, _ := strconv.Atoi(c.Query("status"))
group := c.Query("group")

tokens, total, err := model.SearchTokens(group, keyword, page*perPage, perPage, order, status, name, key)
tokens, total, err := model.SearchTokens(group, keyword, page, perPage, order, status, name, key)
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand All @@ -181,7 +181,7 @@ func SearchGroupTokens(c *gin.Context) {
key := c.Query("key")
status, _ := strconv.Atoi(c.Query("status"))

tokens, total, err := model.SearchTokens(group, keyword, page*perPage, perPage, order, status, name, key)
tokens, total, err := model.SearchTokens(group, keyword, page, perPage, order, status, name, key)
if err != nil {
middleware.ErrorResponse(c, http.StatusOK, err.Error())
return
Expand Down
10 changes: 0 additions & 10 deletions service/aiproxy/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@ import (

func parsePageParams(c *gin.Context) (page, perPage int) {
page, _ = strconv.Atoi(c.Query("p"))
page--
if page < 0 {
page = 0
}

perPage, _ = strconv.Atoi(c.Query("per_page"))
if perPage <= 0 {
perPage = 10
} else if perPage > 100 {
perPage = 100
}
return
}
36 changes: 31 additions & 5 deletions service/aiproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
stdlog "log"
"net/http"
Expand Down Expand Up @@ -134,13 +135,36 @@ func setupHTTPServer() (*http.Server, *gin.Engine) {
}, server
}

func autoTestBannedModels() {
func autoTestBannedModels(ctx context.Context) {
log.Info("auto test banned models start")
ticker := time.NewTicker(time.Second * 15)
defer ticker.Stop()

for range ticker.C {
controller.AutoTestBannedModels()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
controller.AutoTestBannedModels()
}
}
}

func cleanLog(ctx context.Context) {
log.Info("clean log start")
ticker := time.NewTicker(time.Minute * 15)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := model.CleanLog()
if err != nil {
log.Errorf("clean log failed: %s", err)
}
}
}
}

Expand All @@ -165,12 +189,14 @@ func main() {

go func() {
log.Infof("server started on http://localhost:%s", srv.Addr[1:])
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
if err := srv.ListenAndServe(); err != nil &&
!errors.Is(err, http.ErrServerClosed) {
log.Fatal("failed to start HTTP server: " + err.Error())
}
}()

go autoTestBannedModels()
go autoTestBannedModels(ctx)
go cleanLog(ctx)

<-ctx.Done()

Expand Down
10 changes: 6 additions & 4 deletions service/aiproxy/model/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func GetAllChannels() (channels []*Channel, err error) {
return channels, err
}

func GetChannels(startIdx int, num int, id int, name string, key string, channelType int, baseURL string, order string) (channels []*Channel, total int64, err error) {
func GetChannels(page int, perPage int, id int, name string, key string, channelType int, baseURL string, order string) (channels []*Channel, total int64, err error) {
tx := DB.Model(&Channel{})
if id != 0 {
tx = tx.Where("id = ?", id)
Expand All @@ -176,11 +176,12 @@ func GetChannels(startIdx int, num int, id int, name string, key string, channel
if total <= 0 {
return nil, 0, nil
}
err = tx.Order(getChannelOrder(order)).Limit(num).Offset(startIdx).Find(&channels).Error
limit, offset := toLimitOffset(page, perPage)
err = tx.Order(getChannelOrder(order)).Limit(limit).Offset(offset).Find(&channels).Error
return channels, total, err
}

func SearchChannels(keyword string, startIdx int, num int, id int, name string, key string, channelType int, baseURL string, order string) (channels []*Channel, total int64, err error) {
func SearchChannels(keyword string, page int, perPage int, id int, name string, key string, channelType int, baseURL string, order string) (channels []*Channel, total int64, err error) {
tx := DB.Model(&Channel{})

// Handle exact match conditions for non-zero values
Expand Down Expand Up @@ -257,7 +258,8 @@ func SearchChannels(keyword string, startIdx int, num int, id int, name string,
if total <= 0 {
return nil, 0, nil
}
err = tx.Order(getChannelOrder(order)).Limit(num).Offset(startIdx).Find(&channels).Error
limit, offset := toLimitOffset(page, perPage)
err = tx.Order(getChannelOrder(order)).Limit(limit).Offset(offset).Find(&channels).Error
return channels, total, err
}

Expand Down
8 changes: 2 additions & 6 deletions service/aiproxy/model/consumeerr.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,8 @@ func SearchConsumeError(keyword string, requestID string, group string, tokenNam
return nil, 0, nil
}

page--
if page < 0 {
page = 0
}

var errors []*ConsumeError
err = tx.Order(getLogOrder(order)).Limit(perPage).Offset(page * perPage).Find(&errors).Error
limit, offset := toLimitOffset(page, perPage)
err = tx.Order(getLogOrder(order)).Limit(limit).Offset(offset).Find(&errors).Error
return errors, total, err
}
11 changes: 6 additions & 5 deletions service/aiproxy/model/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func getGroupOrder(order string) string {
}
}

func GetGroups(startIdx int, num int, order string, onlyDisabled bool) (groups []*Group, total int64, err error) {
func GetGroups(page int, perPage int, order string, onlyDisabled bool) (groups []*Group, total int64, err error) {
tx := DB.Model(&Group{})
if onlyDisabled {
tx = tx.Where("status = ?", GroupStatusDisabled)
Expand All @@ -69,8 +69,8 @@ func GetGroups(startIdx int, num int, order string, onlyDisabled bool) (groups [
if total <= 0 {
return nil, 0, nil
}

err = tx.Order(getGroupOrder(order)).Limit(num).Offset(startIdx).Find(&groups).Error
limit, offset := toLimitOffset(page, perPage)
err = tx.Order(getGroupOrder(order)).Limit(limit).Offset(offset).Find(&groups).Error
return groups, total, err
}

Expand Down Expand Up @@ -242,7 +242,7 @@ func UpdateGroupStatus(id string, status int) (err error) {
return HandleUpdateResult(result, ErrGroupNotFound)
}

func SearchGroup(keyword string, startIdx int, num int, order string, status int) (groups []*Group, total int64, err error) {
func SearchGroup(keyword string, page int, perPage int, order string, status int) (groups []*Group, total int64, err error) {
tx := DB.Model(&Group{})
if status != 0 {
tx = tx.Where("status = ?", status)
Expand All @@ -259,7 +259,8 @@ func SearchGroup(keyword string, startIdx int, num int, order string, status int
if total <= 0 {
return nil, 0, nil
}
err = tx.Order(getGroupOrder(order)).Limit(num).Offset(startIdx).Find(&groups).Error
limit, offset := toLimitOffset(page, perPage)
err = tx.Order(getGroupOrder(order)).Limit(limit).Offset(offset).Find(&groups).Error
return groups, total, err
}

Expand Down
53 changes: 28 additions & 25 deletions service/aiproxy/model/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/labring/sealos/service/aiproxy/common"
"github.com/labring/sealos/service/aiproxy/common/config"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -204,7 +203,28 @@ func GetGroupLogDetail(logID int, group string) (*RequestDetail, error) {
return &detail, nil
}

func cleanRequestDetail() error {
func CleanLog() error {
err := cleanLog()
if err != nil {
return err
}
return cleanLogDetail()
}

func cleanLog() error {
logStorageHours := config.GetLogStorageHours()
if logStorageHours <= 0 {
return nil
}
return LogDB.
Where(
"created_at < ?",
time.Now().Add(-time.Duration(logStorageHours)*time.Hour),
).
Delete(&Log{}).Error
}

func cleanLogDetail() error {
detailStorageHours := config.GetLogDetailStorageHours()
if detailStorageHours <= 0 {
return nil
Expand Down Expand Up @@ -237,15 +257,6 @@ func RecordConsumeLog(
ip string,
requestDetail *RequestDetail,
) error {
defer func() {
if requestDetail == nil {
return
}
err := cleanRequestDetail()
if err != nil {
log.Errorf("delete request detail failed: %s", err)
}
}()
log := &Log{
RequestID: requestID,
RequestAt: requestAt,
Expand Down Expand Up @@ -404,11 +415,6 @@ func getLogs(
})

g.Go(func() error {
page--
if page < 0 {
page = 0
}

query := buildGetLogsQuery(
group,
startTimestamp,
Expand All @@ -431,10 +437,11 @@ func getLogs(
})
}

limit, offset := toLimitOffset(page, perPage)
return query.
Order(getLogOrder(order)).
Limit(perPage).
Offset(page * perPage).
Limit(limit).
Offset(offset).
Find(&logs).Error
})

Expand Down Expand Up @@ -721,11 +728,6 @@ func searchLogs(
})

g.Go(func() error {
page--
if page < 0 {
page = 0
}

query := buildSearchLogsQuery(
group,
keyword,
Expand All @@ -750,10 +752,11 @@ func searchLogs(
})
}

limit, offset := toLimitOffset(page, perPage)
return query.
Order(getLogOrder(order)).
Limit(perPage).
Offset(page * perPage).
Limit(limit).
Offset(offset).
Find(&logs).Error
})

Expand Down
Loading