diff --git a/internal/ym/products/repository_api.go b/internal/ym/products/repository_api.go index a6011d5..a518560 100644 --- a/internal/ym/products/repository_api.go +++ b/internal/ym/products/repository_api.go @@ -101,58 +101,6 @@ func (r *apiRepository) fetchBusinessIDFromCampaigns(ctx context.Context, client } // GetProducts retrieves products from Yandex Market API in chunks and sends results to channels -func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req *pb.GetProductsRequest, resultChan chan<- []*pb.GetProductsResponse_Offer, errChan chan<- error) { - defer close(resultChan) - defer close(errChan) - - _, client, businessID, err := r.setupMarketplaceClient(ctx, marketplaceID) - if err != nil { - errChan <- err - return - } - key := GetProductKey(strconv.Itoa(int(businessID))) - var cachedMessage pb.GetProductsResponse - err = redis.ReadProtoMessage(ctx, key, &cachedMessage) - if err == nil && len(cachedMessage.Offers) > 0 { - resultChan <- cachedMessage.Offers - return - } - locker := *redis.Locker - _, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ym:products:marketplace:%d:lock", businessID)) - if err != nil { - return - } - defer cancel() - - r.setOfferMappingsRateLimit(businessID) - // all offers - var allOffers []*pb.GetProductsResponse_Offer - defer func() { - if ctx.Err() != nil { - return - } - // store only if offers is 90% or more then requested - if len(allOffers) < int(float64(len(req.OfferIds))*0.9) { - return - } - if len(allOffers) == 0 { - return - } - message := pb.GetProductsResponse{ - Offers: allOffers, - } - _ = redis.WriteProtoMessage(ctx, key, &message) - }() - for _, chunk := range lo.Chunk(req.OfferIds, defaultChunkSize) { - offers, err := r.fetchOfferMappings(ctx, client, businessID, chunk) - if err != nil { - errChan <- err - return - } - resultChan <- offers - allOffers = append(allOffers, offers...) - } -} // setupMarketplaceClient initializes marketplace, API client, and business ID func (r *apiRepository) setupMarketplaceClient(ctx context.Context, marketplaceID int) (*marketplace.Marketplace, *ymclient.APIClient, int64, error) { @@ -391,3 +339,55 @@ func (r *apiRepository) convertResponseToProto(response *ymclient.CalculateTarif result.Offers = offers return result } +func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req *pb.GetProductsRequest, resultChan chan<- []*pb.GetProductsResponse_Offer, errChan chan<- error) { + defer close(resultChan) + defer close(errChan) + + _, client, businessID, err := r.setupMarketplaceClient(ctx, marketplaceID) + if err != nil { + errChan <- err + return + } + key := GetProductKey(strconv.Itoa(int(businessID)) + strconv.Itoa(marketplaceID)) + var cachedMessage pb.GetProductsResponse + err = redis.ReadProtoMessage(ctx, key, &cachedMessage) + if err == nil && len(cachedMessage.Offers) > 0 { + resultChan <- cachedMessage.Offers + return + } + locker := *redis.Locker + _, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ym:products:marketplace:%d:lock", businessID)) + if err != nil { + return + } + defer cancel() + + r.setOfferMappingsRateLimit(businessID) + // all offers + var allOffers []*pb.GetProductsResponse_Offer + defer func() { + if ctx.Err() != nil { + return + } + // store only if offers is 90% or more then requested + if len(allOffers) < int(float64(len(req.OfferIds))*0.9) { + return + } + if len(allOffers) == 0 { + return + } + message := pb.GetProductsResponse{ + Offers: allOffers, + } + _ = redis.WriteProtoMessage(ctx, key, &message) + }() + for _, chunk := range lo.Chunk(req.OfferIds, defaultChunkSize) { + offers, err := r.fetchOfferMappings(ctx, client, businessID, chunk) + if err != nil { + errChan <- err + return + } + resultChan <- offers + allOffers = append(allOffers, offers...) + } +}