diff --git a/internal/ozon/products/repository_api.go b/internal/ozon/products/repository_api.go index cb2bb30..8bf6b6f 100644 --- a/internal/ozon/products/repository_api.go +++ b/internal/ozon/products/repository_api.go @@ -34,14 +34,16 @@ func fetchProductIds(ctx context.Context, client *api.Client, resultChan chan<- defer close(resultChan) lastId := "" for { + if ctx.Err() != nil { + return + } resp, err := client.Products().GetListOfProducts(ctx, &api.GetListOfProductsParams{ Filter: api.GetListOfProductsFilter{Visibility: "ALL"}, LastId: lastId, Limit: 1000, }) + if err != nil { - // dev - //panic(err) errChan <- fmt.Errorf("fetching product IDs: %w", err) return } @@ -70,11 +72,15 @@ func fetchProducts(ctx context.Context, client *api.Client, productIdsChan <-cha wg.Add(1) go func() { defer wg.Done() + if ctx.Err() != nil { + return + } resp, err := client.Products().ListProductsByIDs(ctx, &api.ListProductsByIDsParams{ ProductId: productIds, }) + if err != nil { - // dev + //panic(err) errChan <- fmt.Errorf("fetching products: %w", err) return @@ -143,7 +149,6 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI err = redis.ReadProtoMessage(ctx, key, &cachedMessage) if err == nil && len(cachedMessage.Products) > 0 { resultChan <- utils.DerefSlice(cachedMessage.Products) - //_ = client.EnqueueFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId) return } @@ -161,6 +166,9 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI converter := generated.ConverterImpl{} var allProducts []PbProduct defer func() { + if ctx.Err() != nil { + return + } if len(allProducts) == 0 { return } @@ -189,6 +197,8 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI if !ok { return } + case <-ctx.Done(): + return } } } diff --git a/internal/redis/lock.go b/internal/redis/lock.go index 58eb078..4aec9c7 100644 --- a/internal/redis/lock.go +++ b/internal/redis/lock.go @@ -13,7 +13,9 @@ func InitLocker() error { redisAddr := os.Getenv("REDIS_ADDR") password := os.Getenv("REDIS_PASSWORD") locker, err := rueidislock.NewLocker(rueidislock.LockerOption{ - ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password}, + ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password}, + KeyMajority: 1, + NoLoopTracking: true, }) if err != nil { return err diff --git a/internal/wb/products/repository_api.go b/internal/wb/products/repository_api.go index 1f694b4..873bc69 100644 --- a/internal/wb/products/repository_api.go +++ b/internal/wb/products/repository_api.go @@ -11,6 +11,7 @@ import ( wbapi "sipro-mps/pkg/api/wb/client" "sipro-mps/pkg/utils" + "github.com/go-faster/errors" "github.com/samber/lo" ) @@ -61,6 +62,9 @@ func fetchProducts( for { response, err := client.ContentV2GetCardsListPost(ctx, &request, wbapi.ContentV2GetCardsListPostParams{Locale: wbapi.NewOptString("ru")}) if err != nil { + if errors.Is(err, context.Canceled) { + return + } currentRetry++ if currentRetry >= maxRetries { errChan <- fmt.Errorf("fetching product IDs: %w", err) @@ -131,6 +135,9 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan) var allProducts []pb.Product defer func() { + if ctx.Err() != nil { + return + } if len(allProducts) == 0 { return } @@ -138,7 +145,6 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId Products: utils.ToPtrs(allProducts), } _ = redis.WriteProtoMessage(ctx, key, &message) - }() for { select { @@ -157,6 +163,8 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId }) allProducts = append(allProducts, pbProducts...) resultChan <- pbProducts + case <-ctx.Done(): + return } } diff --git a/internal/ym/products/repository_api.go b/internal/ym/products/repository_api.go index 87baf1d..a6011d5 100644 --- a/internal/ym/products/repository_api.go +++ b/internal/ym/products/repository_api.go @@ -128,6 +128,9 @@ func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req // all offers var allOffers []*pb.GetProductsResponse_Offer defer func() { + if ctx.Err() != nil { + return + } // store only if offers is 90% or more then requested if len(allOffers) < int(float64(len(req.OfferIds))*0.9) { return @@ -243,10 +246,16 @@ func (r *apiRepository) CalculateProductTariffs(ctx context.Context, marketplace offerChunks := lo.Chunk(req.Offers, defaultChunkSize) maxRetries := 5 for chunkIndex, offerChunk := range offerChunks { + if ctx.Err() != nil { + return + } fmt.Printf("Processing chunk %d/%d with %d offers\n", chunkIndex+1, len(offerChunks), len(offerChunk)) var globalError error = nil for range maxRetries { response, err := r.processTariffChunk(ctx, client, ymParameters, offerChunk, chunkIndex) + if ctx.Err() != nil { + return + } if err != nil { globalError = err continue