diff --git a/internal/ozon/products/adapter_grpc.go b/internal/ozon/products/adapter_grpc.go index b3d53db..9845887 100644 --- a/internal/ozon/products/adapter_grpc.go +++ b/internal/ozon/products/adapter_grpc.go @@ -29,7 +29,7 @@ func RegisterAdapterGRPC(server *grpc.Server, marketplaceRepo marketplace.Reposi func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error { ctx := stream.Context() fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId) - resultChan := make(chan []pb.Product) + resultChan := make(chan []pb.Product, 10) errChan := make(chan error) go g.repo.StreamAllProductsCache(ctx, int(req.MarketplaceId), resultChan, errChan) // Запускаем в горутине diff --git a/internal/tasks/ozon/ozon.go b/internal/tasks/ozon/ozon.go index e2195f7..447eb64 100644 --- a/internal/tasks/ozon/ozon.go +++ b/internal/tasks/ozon/ozon.go @@ -23,23 +23,23 @@ func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Ta var payload types.FetchProductsTask err := payload.Unmarshal(task) if err != nil { - return asynq.SkipRetry + return nil } marketplaceRepo := marketplace.NewDBRepository(p.Dbpool) marketplaceById, err := marketplaceRepo.GetMarketplaceByID(ctx, payload.MarketplaceId) if err != nil { - return asynq.SkipRetry + return nil } identifier, err := marketplaceById.GetIdentifier() if err != nil { - return asynq.SkipRetry + return nil } lockKey := fmt.Sprintf("ozon:products:marketplace:%s:lock", identifier) locker := *redis.Locker _, cancel, err := locker.TryWithContext(ctx, lockKey) if err != nil { fmt.Printf("Failed to acquire lock for marketplace %s: %v\n", identifier, err) - return asynq.SkipRetry + return nil } defer cancel() ozonRepo := products.NewAPIRepository(marketplaceRepo) @@ -54,7 +54,7 @@ func (p *FetchProductsProcessor) ProcessTask(ctx context.Context, task *asynq.Ta }) productsJson, err := json.Marshal(productsProto) if err != nil { - return asynq.SkipRetry + return nil } redisClient := *redis.Client productsKey := fmt.Sprintf("ozon:products:%s", identifier) diff --git a/internal/wb/products/adapter_grpc.go b/internal/wb/products/adapter_grpc.go index 0d50374..e34f611 100644 --- a/internal/wb/products/adapter_grpc.go +++ b/internal/wb/products/adapter_grpc.go @@ -27,7 +27,7 @@ func RegisterAdapterGRPC(server *grpc.Server, marketplacesRepository marketplace } func (a *AdapterGRPC) GetProducts(req *pb.GetProductsRequest, stream pb.ProductsService_GetProductsServer) error { ctx := stream.Context() - resultChan := make(chan []pb.Product) + resultChan := make(chan []pb.Product, 10) errChan := make(chan error) go a.repo.StreamAllProductsCache(ctx, int(req.MarketplaceId), resultChan, errChan) for {