diff --git a/api/generated/v1/ozon/products/products.pb.go b/api/generated/v1/ozon/products/products.pb.go index 73397c6..502acd5 100644 --- a/api/generated/v1/ozon/products/products.pb.go +++ b/api/generated/v1/ozon/products/products.pb.go @@ -24,6 +24,7 @@ const ( type GetListOfProductsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` + MarketplaceId int64 `protobuf:"varint,1,opt,name=marketplace_id,json=marketplaceId,proto3" json:"marketplace_id,omitempty"` // Unique identifier for the marketplace unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -58,6 +59,13 @@ func (*GetListOfProductsRequest) Descriptor() ([]byte, []int) { return file_ozon_products_proto_rawDescGZIP(), []int{0} } +func (x *GetListOfProductsRequest) GetMarketplaceId() int64 { + if x != nil { + return x.MarketplaceId + } + return 0 +} + type GetListOfProductsResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Products []*Product `protobuf:"bytes,1,rep,name=products,proto3" json:"products,omitempty"` @@ -346,8 +354,9 @@ var File_ozon_products_proto protoreflect.FileDescriptor const file_ozon_products_proto_rawDesc = "" + "\n" + - "\x13ozon/products.proto\x12\rozon.products\x1a\x1fgoogle/protobuf/timestamp.proto\"\x1a\n" + - "\x18GetListOfProductsRequest\"O\n" + + "\x13ozon/products.proto\x12\rozon.products\x1a\x1fgoogle/protobuf/timestamp.proto\"A\n" + + "\x18GetListOfProductsRequest\x12%\n" + + "\x0emarketplace_id\x18\x01 \x01(\x03R\rmarketplaceId\"O\n" + "\x19GetListOfProductsResponse\x122\n" + "\bproducts\x18\x01 \x03(\v2\x16.ozon.products.ProductR\bproducts\"\xb3\x03\n" + "\aProduct\x12\x0e\n" + diff --git a/api/proto/v1 b/api/proto/v1 index b30d322..c6df89a 160000 --- a/api/proto/v1 +++ b/api/proto/v1 @@ -1 +1 @@ -Subproject commit b30d322774f089fc685bc184c39df1a0b3acfc5f +Subproject commit c6df89a7a355ea20f9b573874d808c5df29acfdb diff --git a/cmd/server/main.go b/cmd/server/main.go index 472df44..8150509 100755 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -3,7 +3,7 @@ package main import ( "context" "fmt" - "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "google.golang.org/grpc" "net" @@ -31,7 +31,7 @@ func logMessage(level string, format string, a ...interface{}) { } } -func createGrpcServer() { +func createGrpcServer(pool *pgxpool.Pool) { lis, err := net.Listen("tcp", ":8080") if err != nil { @@ -40,7 +40,7 @@ func createGrpcServer() { } grpcServer := grpc.NewServer() - repo, err := marketplace.RegisterAdapterGRPC(grpcServer) + repo, err := marketplace.RegisterAdapterGRPC(grpcServer, pool) if err != nil { fmt.Printf("failed to register gRPC server: %v\n", err) return @@ -82,22 +82,15 @@ func main() { logMessage("info", "Redis client initialized successfully. 🟥") // Initializing pgx connection - conn, err := pgx.Connect(ctx, os.Getenv("POSTGRES_URL")) + dbpool, err := pgxpool.New(ctx, os.Getenv("POSTGRES_URL")) if err != nil { logMessage("error", "Failed to connect to PostgreSQL: %v", err) return } - defer conn.Close(ctx) + defer dbpool.Close() logMessage("info", "Connected to PostgreSQL successfully. 🐘") - createGrpcServer() - // ------------------ shitting - //mpRepo := marketplace.NewDBRepository(conn) - // - //productsRepo := products.NewAPIRepository( - // mpRepo) - //_, err = productsRepo.GetAllProducts(ctx, 262) - //if err != nil { - // return + createGrpcServer(dbpool) + } //for _, item := range items { diff --git a/internal/marketplace/adapter_grpc.go b/internal/marketplace/adapter_grpc.go index 73209f5..e7afe59 100644 --- a/internal/marketplace/adapter_grpc.go +++ b/internal/marketplace/adapter_grpc.go @@ -2,11 +2,11 @@ package marketplace import ( "context" - "github.com/jackc/pgx/v5" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" pb "sipro-mps/api/generated/v1/marketplace" + "sipro-mps/internal/marketplace/db" ) // AdapterGRPC implements the gRPC server for the Marketplace service. @@ -20,11 +20,11 @@ func NewAdapterGRPC(repo Repository) *AdapterGRPC { repo: repo, } } -func RegisterAdapterGRPC(server *grpc.Server) (*Repository, error) { - conn, err := pgx.Connect(context.Background(), "postgresql://postgres:GjitkeYf%5Beq@/sipro?host=/run/postgresql") - if err != nil { - return nil, err - } +func RegisterAdapterGRPC(server *grpc.Server, conn db.DBTX) (*Repository, error) { + //conn, err := pgx.Connect(context.Background(), "postgresql://postgres:GjitkeYf%5Beq@/sipro?host=/run/postgresql") + //if err != nil { + // return nil, err + //} repo := NewDBRepository(conn) adapter := NewAdapterGRPC(repo) pb.RegisterMarketplaceServiceServer(server, adapter) diff --git a/internal/marketplace/repository_db.go b/internal/marketplace/repository_db.go index 9bbea84..857f7c8 100644 --- a/internal/marketplace/repository_db.go +++ b/internal/marketplace/repository_db.go @@ -2,15 +2,14 @@ package marketplace import ( "context" - "github.com/jackc/pgx/v5" "sipro-mps/internal/marketplace/db" ) type dbRepository struct { - conn *pgx.Conn + conn db.DBTX } -func NewDBRepository(conn *pgx.Conn) Repository { +func NewDBRepository(conn db.DBTX) Repository { return &dbRepository{conn: conn} } diff --git a/internal/ozon/products/adapter_grpc.go b/internal/ozon/products/adapter_grpc.go index 0f7f944..7afad8c 100644 --- a/internal/ozon/products/adapter_grpc.go +++ b/internal/ozon/products/adapter_grpc.go @@ -1,6 +1,7 @@ package products import ( + "fmt" "github.com/samber/lo" "google.golang.org/grpc" pb "sipro-mps/api/generated/v1/ozon/products" @@ -26,20 +27,23 @@ func RegisterAdapterGRPC(server *grpc.Server, marketplaceRepo marketplace.Reposi pb.RegisterProductsServiceServer(server, adapter) return &apiRepo, nil } - func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error { ctx := stream.Context() + fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId) converter := generated.ConverterImpl{} resultChan := make(chan []OzonProduct) errChan := make(chan error) - g.repo.StreamAllProducts(ctx, 262, resultChan, errChan) + + go g.repo.StreamAllProducts(ctx, int(req.MarketplaceId), resultChan, errChan) // Запускаем в горутине + for { select { case <-ctx.Done(): - return ctx.Err() // Handle context cancellation + fmt.Println("GetListOfProducts: context cancelled or deadline exceeded:", ctx.Err()) + return ctx.Err() case products, ok := <-resultChan: if !ok { - return nil + return nil // Завершаем, только если результат закрыт } protoProducts := lo.Map(products, func(product OzonProduct, _ int) *pb.Product { return converter.ToProto(&product) @@ -48,13 +52,12 @@ func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream Products: protoProducts, } if err := stream.Send(resp); err != nil { - return err // Error sending response + fmt.Println("GetListOfProducts: error sending response:", err) + return err } case err, ok := <-errChan: - if !ok { - return nil // Exit loop when errChan is closed - } - if err != nil { + if ok && err != nil { + fmt.Println("GetListOfProducts: error received from channel:", err) return err } } diff --git a/internal/ozon/products/repository_api.go b/internal/ozon/products/repository_api.go index 988995a..e946a3e 100644 --- a/internal/ozon/products/repository_api.go +++ b/internal/ozon/products/repository_api.go @@ -31,8 +31,8 @@ func fetchProductIds(ctx context.Context, client *api.Client, resultChan chan<- }) if err != nil { // dev - panic(err) - //errChan <- fmt.Errorf("fetching product IDs: %w", err) + //panic(err) + errChan <- fmt.Errorf("fetching product IDs: %w", err) return } @@ -65,8 +65,8 @@ func fetchProducts(ctx context.Context, client *api.Client, productIdsChan <-cha }) if err != nil { // dev - panic(err) - //errChan <- fmt.Errorf("fetching products: %w", err) + //panic(err) + errChan <- fmt.Errorf("fetching products: %w", err) return } items := resp.Items diff --git a/internal/ozon/rate_limiter.go b/internal/ozon/rate_limiter.go index 4f7e9af..9ab61f1 100644 --- a/internal/ozon/rate_limiter.go +++ b/internal/ozon/rate_limiter.go @@ -43,7 +43,6 @@ type RateLimitTransport struct { } func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) { - fmt.Println(time.Now().Format("2006-01-02 15:04:05")) ctx := req.Context() clientId := req.Header.Get("Client-Id") now := time.Now().UnixNano() @@ -57,6 +56,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error return nil, fmt.Errorf("failed to execute rate limit script: %w", err) } if waitTime > 0 { + fmt.Printf("Rate limit exceeded for client %s, waiting for %d nanoseconds\n", clientId, waitTime) time.Sleep(time.Duration(waitTime)) } return t.RoundTripper.RoundTrip(req)