Refactor gRPC adapter to use connection pool and improve error handling in product retrieval

This commit is contained in:
2025-05-27 17:50:20 +03:00
parent b083cccc09
commit b48421e653
8 changed files with 44 additions and 40 deletions

View File

@@ -24,6 +24,7 @@ const (
type GetListOfProductsRequest struct { type GetListOfProductsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
MarketplaceId int64 `protobuf:"varint,1,opt,name=marketplace_id,json=marketplaceId,proto3" json:"marketplace_id,omitempty"` // Unique identifier for the marketplace
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@@ -58,6 +59,13 @@ func (*GetListOfProductsRequest) Descriptor() ([]byte, []int) {
return file_ozon_products_proto_rawDescGZIP(), []int{0} return file_ozon_products_proto_rawDescGZIP(), []int{0}
} }
func (x *GetListOfProductsRequest) GetMarketplaceId() int64 {
if x != nil {
return x.MarketplaceId
}
return 0
}
type GetListOfProductsResponse struct { type GetListOfProductsResponse struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Products []*Product `protobuf:"bytes,1,rep,name=products,proto3" json:"products,omitempty"` Products []*Product `protobuf:"bytes,1,rep,name=products,proto3" json:"products,omitempty"`
@@ -346,8 +354,9 @@ var File_ozon_products_proto protoreflect.FileDescriptor
const file_ozon_products_proto_rawDesc = "" + const file_ozon_products_proto_rawDesc = "" +
"\n" + "\n" +
"\x13ozon/products.proto\x12\rozon.products\x1a\x1fgoogle/protobuf/timestamp.proto\"\x1a\n" + "\x13ozon/products.proto\x12\rozon.products\x1a\x1fgoogle/protobuf/timestamp.proto\"A\n" +
"\x18GetListOfProductsRequest\"O\n" + "\x18GetListOfProductsRequest\x12%\n" +
"\x0emarketplace_id\x18\x01 \x01(\x03R\rmarketplaceId\"O\n" +
"\x19GetListOfProductsResponse\x122\n" + "\x19GetListOfProductsResponse\x122\n" +
"\bproducts\x18\x01 \x03(\v2\x16.ozon.products.ProductR\bproducts\"\xb3\x03\n" + "\bproducts\x18\x01 \x03(\v2\x16.ozon.products.ProductR\bproducts\"\xb3\x03\n" +
"\aProduct\x12\x0e\n" + "\aProduct\x12\x0e\n" +

View File

@@ -3,7 +3,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool"
"github.com/joho/godotenv" "github.com/joho/godotenv"
"google.golang.org/grpc" "google.golang.org/grpc"
"net" "net"
@@ -31,7 +31,7 @@ func logMessage(level string, format string, a ...interface{}) {
} }
} }
func createGrpcServer() { func createGrpcServer(pool *pgxpool.Pool) {
lis, err := net.Listen("tcp", ":8080") lis, err := net.Listen("tcp", ":8080")
if err != nil { if err != nil {
@@ -40,7 +40,7 @@ func createGrpcServer() {
} }
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
repo, err := marketplace.RegisterAdapterGRPC(grpcServer) repo, err := marketplace.RegisterAdapterGRPC(grpcServer, pool)
if err != nil { if err != nil {
fmt.Printf("failed to register gRPC server: %v\n", err) fmt.Printf("failed to register gRPC server: %v\n", err)
return return
@@ -82,22 +82,15 @@ func main() {
logMessage("info", "Redis client initialized successfully. 🟥") logMessage("info", "Redis client initialized successfully. 🟥")
// Initializing pgx connection // Initializing pgx connection
conn, err := pgx.Connect(ctx, os.Getenv("POSTGRES_URL")) dbpool, err := pgxpool.New(ctx, os.Getenv("POSTGRES_URL"))
if err != nil { if err != nil {
logMessage("error", "Failed to connect to PostgreSQL: %v", err) logMessage("error", "Failed to connect to PostgreSQL: %v", err)
return return
} }
defer conn.Close(ctx) defer dbpool.Close()
logMessage("info", "Connected to PostgreSQL successfully. 🐘") logMessage("info", "Connected to PostgreSQL successfully. 🐘")
createGrpcServer() createGrpcServer(dbpool)
// ------------------ shitting
//mpRepo := marketplace.NewDBRepository(conn)
//
//productsRepo := products.NewAPIRepository(
// mpRepo)
//_, err = productsRepo.GetAllProducts(ctx, 262)
//if err != nil {
// return
} }
//for _, item := range items { //for _, item := range items {

View File

@@ -2,11 +2,11 @@ package marketplace
import ( import (
"context" "context"
"github.com/jackc/pgx/v5"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
pb "sipro-mps/api/generated/v1/marketplace" pb "sipro-mps/api/generated/v1/marketplace"
"sipro-mps/internal/marketplace/db"
) )
// AdapterGRPC implements the gRPC server for the Marketplace service. // AdapterGRPC implements the gRPC server for the Marketplace service.
@@ -20,11 +20,11 @@ func NewAdapterGRPC(repo Repository) *AdapterGRPC {
repo: repo, repo: repo,
} }
} }
func RegisterAdapterGRPC(server *grpc.Server) (*Repository, error) { func RegisterAdapterGRPC(server *grpc.Server, conn db.DBTX) (*Repository, error) {
conn, err := pgx.Connect(context.Background(), "postgresql://postgres:GjitkeYf%5Beq@/sipro?host=/run/postgresql") //conn, err := pgx.Connect(context.Background(), "postgresql://postgres:GjitkeYf%5Beq@/sipro?host=/run/postgresql")
if err != nil { //if err != nil {
return nil, err // return nil, err
} //}
repo := NewDBRepository(conn) repo := NewDBRepository(conn)
adapter := NewAdapterGRPC(repo) adapter := NewAdapterGRPC(repo)
pb.RegisterMarketplaceServiceServer(server, adapter) pb.RegisterMarketplaceServiceServer(server, adapter)

View File

@@ -2,15 +2,14 @@ package marketplace
import ( import (
"context" "context"
"github.com/jackc/pgx/v5"
"sipro-mps/internal/marketplace/db" "sipro-mps/internal/marketplace/db"
) )
type dbRepository struct { type dbRepository struct {
conn *pgx.Conn conn db.DBTX
} }
func NewDBRepository(conn *pgx.Conn) Repository { func NewDBRepository(conn db.DBTX) Repository {
return &dbRepository{conn: conn} return &dbRepository{conn: conn}
} }

View File

@@ -1,6 +1,7 @@
package products package products
import ( import (
"fmt"
"github.com/samber/lo" "github.com/samber/lo"
"google.golang.org/grpc" "google.golang.org/grpc"
pb "sipro-mps/api/generated/v1/ozon/products" pb "sipro-mps/api/generated/v1/ozon/products"
@@ -26,20 +27,23 @@ func RegisterAdapterGRPC(server *grpc.Server, marketplaceRepo marketplace.Reposi
pb.RegisterProductsServiceServer(server, adapter) pb.RegisterProductsServiceServer(server, adapter)
return &apiRepo, nil return &apiRepo, nil
} }
func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error { func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error {
ctx := stream.Context() ctx := stream.Context()
fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId)
converter := generated.ConverterImpl{} converter := generated.ConverterImpl{}
resultChan := make(chan []OzonProduct) resultChan := make(chan []OzonProduct)
errChan := make(chan error) errChan := make(chan error)
g.repo.StreamAllProducts(ctx, 262, resultChan, errChan)
go g.repo.StreamAllProducts(ctx, int(req.MarketplaceId), resultChan, errChan) // Запускаем в горутине
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() // Handle context cancellation fmt.Println("GetListOfProducts: context cancelled or deadline exceeded:", ctx.Err())
return ctx.Err()
case products, ok := <-resultChan: case products, ok := <-resultChan:
if !ok { if !ok {
return nil return nil // Завершаем, только если результат закрыт
} }
protoProducts := lo.Map(products, func(product OzonProduct, _ int) *pb.Product { protoProducts := lo.Map(products, func(product OzonProduct, _ int) *pb.Product {
return converter.ToProto(&product) return converter.ToProto(&product)
@@ -48,13 +52,12 @@ func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream
Products: protoProducts, Products: protoProducts,
} }
if err := stream.Send(resp); err != nil { if err := stream.Send(resp); err != nil {
return err // Error sending response fmt.Println("GetListOfProducts: error sending response:", err)
return err
} }
case err, ok := <-errChan: case err, ok := <-errChan:
if !ok { if ok && err != nil {
return nil // Exit loop when errChan is closed fmt.Println("GetListOfProducts: error received from channel:", err)
}
if err != nil {
return err return err
} }
} }

View File

@@ -31,8 +31,8 @@ func fetchProductIds(ctx context.Context, client *api.Client, resultChan chan<-
}) })
if err != nil { if err != nil {
// dev // dev
panic(err) //panic(err)
//errChan <- fmt.Errorf("fetching product IDs: %w", err) errChan <- fmt.Errorf("fetching product IDs: %w", err)
return return
} }
@@ -65,8 +65,8 @@ func fetchProducts(ctx context.Context, client *api.Client, productIdsChan <-cha
}) })
if err != nil { if err != nil {
// dev // dev
panic(err) //panic(err)
//errChan <- fmt.Errorf("fetching products: %w", err) errChan <- fmt.Errorf("fetching products: %w", err)
return return
} }
items := resp.Items items := resp.Items

View File

@@ -43,7 +43,6 @@ type RateLimitTransport struct {
} }
func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) { func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
ctx := req.Context() ctx := req.Context()
clientId := req.Header.Get("Client-Id") clientId := req.Header.Get("Client-Id")
now := time.Now().UnixNano() now := time.Now().UnixNano()
@@ -57,6 +56,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error
return nil, fmt.Errorf("failed to execute rate limit script: %w", err) return nil, fmt.Errorf("failed to execute rate limit script: %w", err)
} }
if waitTime > 0 { if waitTime > 0 {
fmt.Printf("Rate limit exceeded for client %s, waiting for %d nanoseconds\n", clientId, waitTime)
time.Sleep(time.Duration(waitTime)) time.Sleep(time.Duration(waitTime))
} }
return t.RoundTripper.RoundTrip(req) return t.RoundTripper.RoundTrip(req)