feat: implement locking mechanism for marketplace operations to prevent race conditions
This commit is contained in:
		@@ -149,6 +149,12 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						locker := *redis.Locker
 | 
				
			||||||
 | 
						_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ozon:products:marketplace:%s:lock", key))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	innerResultChan := make(chan []OzonProduct)
 | 
						innerResultChan := make(chan []OzonProduct)
 | 
				
			||||||
	innerErrChan := make(chan error)
 | 
						innerErrChan := make(chan error)
 | 
				
			||||||
@@ -198,11 +204,24 @@ func (a *apiRepository) StreamProductAttributesCache(ctx context.Context, market
 | 
				
			|||||||
		errChan <- err
 | 
							errChan <- err
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						identifier, err := mp.GetIdentifier()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	ozonClient, err := ozon.GetClientFromMarketplace(mp)
 | 
						ozonClient, err := ozon.GetClientFromMarketplace(mp)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		errChan <- err
 | 
							errChan <- err
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						key := fmt.Sprintf("ozon:product_attributes:%s:lock", identifier)
 | 
				
			||||||
 | 
						locker := *redis.Locker
 | 
				
			||||||
 | 
						_, cancel, err := locker.WithContext(ctx, key)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	converter := generated.ConverterImpl{}
 | 
						converter := generated.ConverterImpl{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, chunk := range lo.Chunk(productIds, 1000) {
 | 
						for _, chunk := range lo.Chunk(productIds, 1000) {
 | 
				
			||||||
@@ -235,10 +254,26 @@ func (a *apiRepository) DeleteProducts(ctx context.Context, marketplaceId int, i
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						identifier, err := mp.GetIdentifier()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("getting marketplace identifier: %w", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ozonClient, err := ozon.GetClientFromMarketplace(mp)
 | 
						ozonClient, err := ozon.GetClientFromMarketplace(mp)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						key := fmt.Sprintf("ozon:products_delete:%s:lock", identifier)
 | 
				
			||||||
 | 
						locker := *redis.Locker
 | 
				
			||||||
 | 
						_, cancel, err := locker.WithContext(ctx, key)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Step 1: map the items into a slice
 | 
						// Step 1: map the items into a slice
 | 
				
			||||||
	mapped := lo.Map(items, func(item *PbDeleteProductRequestItem, _ int) *PbDeleteProductResponseItem {
 | 
						mapped := lo.Map(items, func(item *PbDeleteProductRequestItem, _ int) *PbDeleteProductResponseItem {
 | 
				
			||||||
		return &PbDeleteProductResponseItem{
 | 
							return &PbDeleteProductResponseItem{
 | 
				
			||||||
@@ -293,6 +328,21 @@ func (a *apiRepository) CreateOrUpdateProducts(ctx context.Context, marketplaceI
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						identifier, err := mp.GetIdentifier()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("getting marketplace identifier: %w", err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						key := fmt.Sprintf("ozon:products_create_update:%s:lock", identifier)
 | 
				
			||||||
 | 
						locker := *redis.Locker
 | 
				
			||||||
 | 
						_, cancel, err := locker.WithContext(ctx, key)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	converter := generated.ConverterImpl{}
 | 
						converter := generated.ConverterImpl{}
 | 
				
			||||||
	pageSize := 100
 | 
						pageSize := 100
 | 
				
			||||||
	result := make([]int64, (len(items)+pageSize-1)/pageSize)
 | 
						result := make([]int64, (len(items)+pageSize-1)/pageSize)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -101,6 +101,7 @@ func fetchProducts(
 | 
				
			|||||||
func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []pb.Product, errChan chan<- error) {
 | 
					func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []pb.Product, errChan chan<- error) {
 | 
				
			||||||
	defer close(resultChan)
 | 
						defer close(resultChan)
 | 
				
			||||||
	defer close(errChan)
 | 
						defer close(errChan)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
 | 
						mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		errChan <- fmt.Errorf("getting marketplace by ID: %w", err)
 | 
							errChan <- fmt.Errorf("getting marketplace by ID: %w", err)
 | 
				
			||||||
@@ -111,9 +112,7 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
 | 
				
			|||||||
		errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
 | 
							errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	converter := generated.ConverterImpl{}
 | 
						converter := generated.ConverterImpl{}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	key := GetProductsKey(identifier)
 | 
						key := GetProductsKey(identifier)
 | 
				
			||||||
	var cachedMessage pb.GetProductsResponse
 | 
						var cachedMessage pb.GetProductsResponse
 | 
				
			||||||
	err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
 | 
						err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
 | 
				
			||||||
@@ -122,6 +121,12 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
 | 
				
			|||||||
		_ = client.EnqueueFetchProductsTask(types.TypeWbFetchProducts, marketplaceId)
 | 
							_ = client.EnqueueFetchProductsTask(types.TypeWbFetchProducts, marketplaceId)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						locker := *redis.Locker
 | 
				
			||||||
 | 
						_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("wb:products:marketplace:%s:lock", key))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	innerResultChan := make(chan []WbProduct)
 | 
						innerResultChan := make(chan []WbProduct)
 | 
				
			||||||
	innerErrChan := make(chan error)
 | 
						innerErrChan := make(chan error)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -117,6 +117,13 @@ func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req
 | 
				
			|||||||
		resultChan <- cachedMessage.Offers
 | 
							resultChan <- cachedMessage.Offers
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						locker := *redis.Locker
 | 
				
			||||||
 | 
						_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ym:products:marketplace:%d:lock", businessID))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.setOfferMappingsRateLimit(businessID)
 | 
						r.setOfferMappingsRateLimit(businessID)
 | 
				
			||||||
	// all offers
 | 
						// all offers
 | 
				
			||||||
	var allOffers []*pb.GetProductsResponse_Offer
 | 
						var allOffers []*pb.GetProductsResponse_Offer
 | 
				
			||||||
@@ -213,11 +220,17 @@ func (r *apiRepository) CalculateProductTariffs(ctx context.Context, marketplace
 | 
				
			|||||||
	defer close(resultChan)
 | 
						defer close(resultChan)
 | 
				
			||||||
	defer close(errChan)
 | 
						defer close(errChan)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, client, _, err := r.setupMarketplaceClient(ctx, marketplaceID)
 | 
						_, client, businessId, err := r.setupMarketplaceClient(ctx, marketplaceID)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		errChan <- err
 | 
							errChan <- err
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						locker := *redis.Locker
 | 
				
			||||||
 | 
						_, cancel, err := locker.WithContext(ctx, fmt.Sprintf("ym:tariffs:marketplace:%d:lock", businessId))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.setTariffsRateLimit()
 | 
						r.setTariffsRateLimit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user