feat: ym net key
This commit is contained in:
@@ -101,58 +101,6 @@ func (r *apiRepository) fetchBusinessIDFromCampaigns(ctx context.Context, client
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetProducts retrieves products from Yandex Market API in chunks and sends results to channels
|
// GetProducts retrieves products from Yandex Market API in chunks and sends results to channels
|
||||||
func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req *pb.GetProductsRequest, resultChan chan<- []*pb.GetProductsResponse_Offer, errChan chan<- error) {
|
|
||||||
defer close(resultChan)
|
|
||||||
defer close(errChan)
|
|
||||||
|
|
||||||
_, client, businessID, err := r.setupMarketplaceClient(ctx, marketplaceID)
|
|
||||||
if err != nil {
|
|
||||||
errChan <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
key := GetProductKey(strconv.Itoa(int(businessID)))
|
|
||||||
var cachedMessage pb.GetProductsResponse
|
|
||||||
err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
|
|
||||||
if err == nil && len(cachedMessage.Offers) > 0 {
|
|
||||||
resultChan <- cachedMessage.Offers
|
|
||||||
return
|
|
||||||
}
|
|
||||||
locker := *redis.Locker
|
|
||||||
_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ym:products:marketplace:%d:lock", businessID))
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
r.setOfferMappingsRateLimit(businessID)
|
|
||||||
// all offers
|
|
||||||
var allOffers []*pb.GetProductsResponse_Offer
|
|
||||||
defer func() {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// store only if offers is 90% or more then requested
|
|
||||||
if len(allOffers) < int(float64(len(req.OfferIds))*0.9) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(allOffers) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
message := pb.GetProductsResponse{
|
|
||||||
Offers: allOffers,
|
|
||||||
}
|
|
||||||
_ = redis.WriteProtoMessage(ctx, key, &message)
|
|
||||||
}()
|
|
||||||
for _, chunk := range lo.Chunk(req.OfferIds, defaultChunkSize) {
|
|
||||||
offers, err := r.fetchOfferMappings(ctx, client, businessID, chunk)
|
|
||||||
if err != nil {
|
|
||||||
errChan <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
resultChan <- offers
|
|
||||||
allOffers = append(allOffers, offers...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// setupMarketplaceClient initializes marketplace, API client, and business ID
|
// setupMarketplaceClient initializes marketplace, API client, and business ID
|
||||||
func (r *apiRepository) setupMarketplaceClient(ctx context.Context, marketplaceID int) (*marketplace.Marketplace, *ymclient.APIClient, int64, error) {
|
func (r *apiRepository) setupMarketplaceClient(ctx context.Context, marketplaceID int) (*marketplace.Marketplace, *ymclient.APIClient, int64, error) {
|
||||||
@@ -391,3 +339,55 @@ func (r *apiRepository) convertResponseToProto(response *ymclient.CalculateTarif
|
|||||||
result.Offers = offers
|
result.Offers = offers
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req *pb.GetProductsRequest, resultChan chan<- []*pb.GetProductsResponse_Offer, errChan chan<- error) {
|
||||||
|
defer close(resultChan)
|
||||||
|
defer close(errChan)
|
||||||
|
|
||||||
|
_, client, businessID, err := r.setupMarketplaceClient(ctx, marketplaceID)
|
||||||
|
if err != nil {
|
||||||
|
errChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := GetProductKey(strconv.Itoa(int(businessID)) + strconv.Itoa(marketplaceID))
|
||||||
|
var cachedMessage pb.GetProductsResponse
|
||||||
|
err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
|
||||||
|
if err == nil && len(cachedMessage.Offers) > 0 {
|
||||||
|
resultChan <- cachedMessage.Offers
|
||||||
|
return
|
||||||
|
}
|
||||||
|
locker := *redis.Locker
|
||||||
|
_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ym:products:marketplace:%d:lock", businessID))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
r.setOfferMappingsRateLimit(businessID)
|
||||||
|
// all offers
|
||||||
|
var allOffers []*pb.GetProductsResponse_Offer
|
||||||
|
defer func() {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// store only if offers is 90% or more then requested
|
||||||
|
if len(allOffers) < int(float64(len(req.OfferIds))*0.9) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(allOffers) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
message := pb.GetProductsResponse{
|
||||||
|
Offers: allOffers,
|
||||||
|
}
|
||||||
|
_ = redis.WriteProtoMessage(ctx, key, &message)
|
||||||
|
}()
|
||||||
|
for _, chunk := range lo.Chunk(req.OfferIds, defaultChunkSize) {
|
||||||
|
offers, err := r.fetchOfferMappings(ctx, client, businessID, chunk)
|
||||||
|
if err != nil {
|
||||||
|
errChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resultChan <- offers
|
||||||
|
allOffers = append(allOffers, offers...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user