From d1e7f4e0f4abbbdcb9b86c98f532e8dbf3d74aef Mon Sep 17 00:00:00 2001 From: admin Date: Fri, 5 Sep 2025 05:09:14 +0300 Subject: [PATCH] feat: implement locking mechanism for marketplace operations to prevent race conditions --- internal/ozon/products/repository_api.go | 50 ++++++++++++++++++++++++ internal/wb/products/repository_api.go | 9 ++++- internal/ym/products/repository_api.go | 15 ++++++- 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/internal/ozon/products/repository_api.go b/internal/ozon/products/repository_api.go index 936c8af..80fd842 100644 --- a/internal/ozon/products/repository_api.go +++ b/internal/ozon/products/repository_api.go @@ -149,6 +149,12 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI return } + locker := *redis.Locker + _, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ozon:products:marketplace:%s:lock", key)) + if err != nil { + return + } + defer cancel() innerResultChan := make(chan []OzonProduct) innerErrChan := make(chan error) @@ -198,11 +204,24 @@ func (a *apiRepository) StreamProductAttributesCache(ctx context.Context, market errChan <- err return } + identifier, err := mp.GetIdentifier() + if err != nil { + errChan <- fmt.Errorf("getting marketplace identifier: %w", err) + return + } ozonClient, err := ozon.GetClientFromMarketplace(mp) if err != nil { errChan <- err return } + key := fmt.Sprintf("ozon:product_attributes:%s:lock", identifier) + locker := *redis.Locker + _, cancel, err := locker.WithContext(ctx, key) + if err != nil { + return + } + defer cancel() + converter := generated.ConverterImpl{} for _, chunk := range lo.Chunk(productIds, 1000) { @@ -235,10 +254,26 @@ func (a *apiRepository) DeleteProducts(ctx context.Context, marketplaceId int, i return nil, err } + + identifier, err := mp.GetIdentifier() + if err != nil { + return nil, fmt.Errorf("getting marketplace identifier: %w", err) + } + ozonClient, err := ozon.GetClientFromMarketplace(mp) if err != nil { return nil, err } + + key := fmt.Sprintf("ozon:products_delete:%s:lock", identifier) + locker := *redis.Locker + _, cancel, err := locker.WithContext(ctx, key) + if err != nil { + return nil, err + + } + defer cancel() + // Step 1: map the items into a slice mapped := lo.Map(items, func(item *PbDeleteProductRequestItem, _ int) *PbDeleteProductResponseItem { return &PbDeleteProductResponseItem{ @@ -293,6 +328,21 @@ func (a *apiRepository) CreateOrUpdateProducts(ctx context.Context, marketplaceI if err != nil { return nil, err } + identifier, err := mp.GetIdentifier() + if err != nil { + return nil, fmt.Errorf("getting marketplace identifier: %w", err) + + } + + key := fmt.Sprintf("ozon:products_create_update:%s:lock", identifier) + locker := *redis.Locker + _, cancel, err := locker.WithContext(ctx, key) + if err != nil { + return nil, err + + } + defer cancel() + converter := generated.ConverterImpl{} pageSize := 100 result := make([]int64, (len(items)+pageSize-1)/pageSize) diff --git a/internal/wb/products/repository_api.go b/internal/wb/products/repository_api.go index 37b137a..50f4b37 100644 --- a/internal/wb/products/repository_api.go +++ b/internal/wb/products/repository_api.go @@ -101,6 +101,7 @@ func fetchProducts( func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []pb.Product, errChan chan<- error) { defer close(resultChan) defer close(errChan) + mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId) if err != nil { errChan <- fmt.Errorf("getting marketplace by ID: %w", err) @@ -111,9 +112,7 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId errChan <- fmt.Errorf("getting marketplace identifier: %w", err) return } - converter := generated.ConverterImpl{} - key := GetProductsKey(identifier) var cachedMessage pb.GetProductsResponse err = redis.ReadProtoMessage(ctx, key, &cachedMessage) @@ -122,6 +121,12 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId _ = client.EnqueueFetchProductsTask(types.TypeWbFetchProducts, marketplaceId) return } + locker := *redis.Locker + _, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("wb:products:marketplace:%s:lock", key)) + if err != nil { + return + } + defer cancel() innerResultChan := make(chan []WbProduct) innerErrChan := make(chan error) diff --git a/internal/ym/products/repository_api.go b/internal/ym/products/repository_api.go index da42e96..87baf1d 100644 --- a/internal/ym/products/repository_api.go +++ b/internal/ym/products/repository_api.go @@ -117,6 +117,13 @@ func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req resultChan <- cachedMessage.Offers return } + locker := *redis.Locker + _, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ym:products:marketplace:%d:lock", businessID)) + if err != nil { + return + } + defer cancel() + r.setOfferMappingsRateLimit(businessID) // all offers var allOffers []*pb.GetProductsResponse_Offer @@ -213,11 +220,17 @@ func (r *apiRepository) CalculateProductTariffs(ctx context.Context, marketplace defer close(resultChan) defer close(errChan) - _, client, _, err := r.setupMarketplaceClient(ctx, marketplaceID) + _, client, businessId, err := r.setupMarketplaceClient(ctx, marketplaceID) if err != nil { errChan <- err return } + locker := *redis.Locker + _, cancel, err := locker.WithContext(ctx, fmt.Sprintf("ym:tariffs:marketplace:%d:lock", businessId)) + if err != nil { + return + } + defer cancel() r.setTariffsRateLimit()