Increase buffer size for result channel in gRPC product retrieval; modify error handling to return nil instead of skipping retry
This commit is contained in:
		@@ -29,7 +29,7 @@ func RegisterAdapterGRPC(server *grpc.Server, marketplaceRepo marketplace.Reposi
 | 
				
			|||||||
func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error {
 | 
					func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error {
 | 
				
			||||||
	ctx := stream.Context()
 | 
						ctx := stream.Context()
 | 
				
			||||||
	fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId)
 | 
						fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId)
 | 
				
			||||||
	resultChan := make(chan []pb.Product)
 | 
						resultChan := make(chan []pb.Product, 10)
 | 
				
			||||||
	errChan := make(chan error)
 | 
						errChan := make(chan error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go g.repo.StreamAllProductsCache(ctx, int(req.MarketplaceId), resultChan, errChan) // Запускаем в горутине
 | 
						go g.repo.StreamAllProductsCache(ctx, int(req.MarketplaceId), resultChan, errChan) // Запускаем в горутине
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -23,23 +23,23 @@ func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Ta
 | 
				
			|||||||
	var payload types.FetchProductsTask
 | 
						var payload types.FetchProductsTask
 | 
				
			||||||
	err := payload.Unmarshal(task)
 | 
						err := payload.Unmarshal(task)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return asynq.SkipRetry
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	marketplaceRepo := marketplace.NewDBRepository(p.Dbpool)
 | 
						marketplaceRepo := marketplace.NewDBRepository(p.Dbpool)
 | 
				
			||||||
	marketplaceById, err := marketplaceRepo.GetMarketplaceByID(ctx, payload.MarketplaceId)
 | 
						marketplaceById, err := marketplaceRepo.GetMarketplaceByID(ctx, payload.MarketplaceId)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return asynq.SkipRetry
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	identifier, err := marketplaceById.GetIdentifier()
 | 
						identifier, err := marketplaceById.GetIdentifier()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return asynq.SkipRetry
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	lockKey := fmt.Sprintf("ozon:products:marketplace:%s:lock", identifier)
 | 
						lockKey := fmt.Sprintf("ozon:products:marketplace:%s:lock", identifier)
 | 
				
			||||||
	locker := *redis.Locker
 | 
						locker := *redis.Locker
 | 
				
			||||||
	_, cancel, err := locker.TryWithContext(ctx, lockKey)
 | 
						_, cancel, err := locker.TryWithContext(ctx, lockKey)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		fmt.Printf("Failed to acquire lock for marketplace %s: %v\n", identifier, err)
 | 
							fmt.Printf("Failed to acquire lock for marketplace %s: %v\n", identifier, err)
 | 
				
			||||||
		return asynq.SkipRetry
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	defer cancel()
 | 
						defer cancel()
 | 
				
			||||||
	ozonRepo := products.NewAPIRepository(marketplaceRepo)
 | 
						ozonRepo := products.NewAPIRepository(marketplaceRepo)
 | 
				
			||||||
@@ -54,7 +54,7 @@ func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Ta
 | 
				
			|||||||
	})
 | 
						})
 | 
				
			||||||
	productsJson, err := json.Marshal(productsProto)
 | 
						productsJson, err := json.Marshal(productsProto)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return asynq.SkipRetry
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	redisClient := *redis.Client
 | 
						redisClient := *redis.Client
 | 
				
			||||||
	productsKey := fmt.Sprintf("ozon:products:%s", identifier)
 | 
						productsKey := fmt.Sprintf("ozon:products:%s", identifier)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -27,7 +27,7 @@ func RegisterAdapterGRPC(server *grpc.Server, marketplacesRepository marketplace
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
func (a *AdapterGRPC) GetProducts(req *pb.GetProductsRequest, stream pb.ProductsService_GetProductsServer) error {
 | 
					func (a *AdapterGRPC) GetProducts(req *pb.GetProductsRequest, stream pb.ProductsService_GetProductsServer) error {
 | 
				
			||||||
	ctx := stream.Context()
 | 
						ctx := stream.Context()
 | 
				
			||||||
	resultChan := make(chan []pb.Product)
 | 
						resultChan := make(chan []pb.Product, 10)
 | 
				
			||||||
	errChan := make(chan error)
 | 
						errChan := make(chan error)
 | 
				
			||||||
	go a.repo.StreamAllProductsCache(ctx, int(req.MarketplaceId), resultChan, errChan)
 | 
						go a.repo.StreamAllProductsCache(ctx, int(req.MarketplaceId), resultChan, errChan)
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user