feat: enhance context handling and add options for redis locker

This commit is contained in:
2025-09-30 00:43:37 +03:00
parent ed7e7608fe
commit 328b4cc66e
4 changed files with 35 additions and 6 deletions

View File

@@ -34,14 +34,16 @@ func fetchProductIds(ctx context.Context, client *api.Client, resultChan chan<-
defer close(resultChan) defer close(resultChan)
lastId := "" lastId := ""
for { for {
if ctx.Err() != nil {
return
}
resp, err := client.Products().GetListOfProducts(ctx, &api.GetListOfProductsParams{ resp, err := client.Products().GetListOfProducts(ctx, &api.GetListOfProductsParams{
Filter: api.GetListOfProductsFilter{Visibility: "ALL"}, Filter: api.GetListOfProductsFilter{Visibility: "ALL"},
LastId: lastId, LastId: lastId,
Limit: 1000, Limit: 1000,
}) })
if err != nil { if err != nil {
// dev
//panic(err)
errChan <- fmt.Errorf("fetching product IDs: %w", err) errChan <- fmt.Errorf("fetching product IDs: %w", err)
return return
} }
@@ -70,11 +72,15 @@ func fetchProducts(ctx context.Context, client *api.Client, productIdsChan <-cha
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
if ctx.Err() != nil {
return
}
resp, err := client.Products().ListProductsByIDs(ctx, &api.ListProductsByIDsParams{ resp, err := client.Products().ListProductsByIDs(ctx, &api.ListProductsByIDsParams{
ProductId: productIds, ProductId: productIds,
}) })
if err != nil { if err != nil {
// dev
//panic(err) //panic(err)
errChan <- fmt.Errorf("fetching products: %w", err) errChan <- fmt.Errorf("fetching products: %w", err)
return return
@@ -143,7 +149,6 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
err = redis.ReadProtoMessage(ctx, key, &cachedMessage) err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
if err == nil && len(cachedMessage.Products) > 0 { if err == nil && len(cachedMessage.Products) > 0 {
resultChan <- utils.DerefSlice(cachedMessage.Products) resultChan <- utils.DerefSlice(cachedMessage.Products)
//_ = client.EnqueueFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId)
return return
} }
@@ -161,6 +166,9 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
converter := generated.ConverterImpl{} converter := generated.ConverterImpl{}
var allProducts []PbProduct var allProducts []PbProduct
defer func() { defer func() {
if ctx.Err() != nil {
return
}
if len(allProducts) == 0 { if len(allProducts) == 0 {
return return
} }
@@ -189,6 +197,8 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
if !ok { if !ok {
return return
} }
case <-ctx.Done():
return
} }
} }
} }

View File

@@ -13,7 +13,9 @@ func InitLocker() error {
redisAddr := os.Getenv("REDIS_ADDR") redisAddr := os.Getenv("REDIS_ADDR")
password := os.Getenv("REDIS_PASSWORD") password := os.Getenv("REDIS_PASSWORD")
locker, err := rueidislock.NewLocker(rueidislock.LockerOption{ locker, err := rueidislock.NewLocker(rueidislock.LockerOption{
ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password}, ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
KeyMajority: 1,
NoLoopTracking: true,
}) })
if err != nil { if err != nil {
return err return err

View File

@@ -11,6 +11,7 @@ import (
wbapi "sipro-mps/pkg/api/wb/client" wbapi "sipro-mps/pkg/api/wb/client"
"sipro-mps/pkg/utils" "sipro-mps/pkg/utils"
"github.com/go-faster/errors"
"github.com/samber/lo" "github.com/samber/lo"
) )
@@ -61,6 +62,9 @@ func fetchProducts(
for { for {
response, err := client.ContentV2GetCardsListPost(ctx, &request, wbapi.ContentV2GetCardsListPostParams{Locale: wbapi.NewOptString("ru")}) response, err := client.ContentV2GetCardsListPost(ctx, &request, wbapi.ContentV2GetCardsListPostParams{Locale: wbapi.NewOptString("ru")})
if err != nil { if err != nil {
if errors.Is(err, context.Canceled) {
return
}
currentRetry++ currentRetry++
if currentRetry >= maxRetries { if currentRetry >= maxRetries {
errChan <- fmt.Errorf("fetching product IDs: %w", err) errChan <- fmt.Errorf("fetching product IDs: %w", err)
@@ -131,6 +135,9 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan) go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan)
var allProducts []pb.Product var allProducts []pb.Product
defer func() { defer func() {
if ctx.Err() != nil {
return
}
if len(allProducts) == 0 { if len(allProducts) == 0 {
return return
} }
@@ -138,7 +145,6 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
Products: utils.ToPtrs(allProducts), Products: utils.ToPtrs(allProducts),
} }
_ = redis.WriteProtoMessage(ctx, key, &message) _ = redis.WriteProtoMessage(ctx, key, &message)
}() }()
for { for {
select { select {
@@ -157,6 +163,8 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
}) })
allProducts = append(allProducts, pbProducts...) allProducts = append(allProducts, pbProducts...)
resultChan <- pbProducts resultChan <- pbProducts
case <-ctx.Done():
return
} }
} }

View File

@@ -128,6 +128,9 @@ func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req
// all offers // all offers
var allOffers []*pb.GetProductsResponse_Offer var allOffers []*pb.GetProductsResponse_Offer
defer func() { defer func() {
if ctx.Err() != nil {
return
}
// store only if offers is 90% or more then requested // store only if offers is 90% or more then requested
if len(allOffers) < int(float64(len(req.OfferIds))*0.9) { if len(allOffers) < int(float64(len(req.OfferIds))*0.9) {
return return
@@ -243,10 +246,16 @@ func (r *apiRepository) CalculateProductTariffs(ctx context.Context, marketplace
offerChunks := lo.Chunk(req.Offers, defaultChunkSize) offerChunks := lo.Chunk(req.Offers, defaultChunkSize)
maxRetries := 5 maxRetries := 5
for chunkIndex, offerChunk := range offerChunks { for chunkIndex, offerChunk := range offerChunks {
if ctx.Err() != nil {
return
}
fmt.Printf("Processing chunk %d/%d with %d offers\n", chunkIndex+1, len(offerChunks), len(offerChunk)) fmt.Printf("Processing chunk %d/%d with %d offers\n", chunkIndex+1, len(offerChunks), len(offerChunk))
var globalError error = nil var globalError error = nil
for range maxRetries { for range maxRetries {
response, err := r.processTariffChunk(ctx, client, ymParameters, offerChunk, chunkIndex) response, err := r.processTariffChunk(ctx, client, ymParameters, offerChunk, chunkIndex)
if ctx.Err() != nil {
return
}
if err != nil { if err != nil {
globalError = err globalError = err
continue continue