Compare commits
	
		
			1 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 3fd63d5f32 | 
@@ -1,148 +1,24 @@
 | 
				
			|||||||
package main
 | 
					package main
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
					 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"net"
 | 
					 | 
				
			||||||
	"os"
 | 
					 | 
				
			||||||
	"sipro-mps/internal/config"
 | 
						"sipro-mps/internal/config"
 | 
				
			||||||
 | 
						"sipro-mps/internal/db"
 | 
				
			||||||
	"sipro-mps/internal/marketplace"
 | 
						"sipro-mps/internal/marketplace"
 | 
				
			||||||
	ozon "sipro-mps/internal/ozon/products"
 | 
						ozon_products "sipro-mps/internal/ozon/products"
 | 
				
			||||||
	"sipro-mps/internal/redis"
 | 
						"sipro-mps/internal/redis"
 | 
				
			||||||
	"sipro-mps/internal/tasks/client"
 | 
						"sipro-mps/internal/transport/grpc"
 | 
				
			||||||
	wb "sipro-mps/internal/wb/products"
 | 
					 | 
				
			||||||
	ym "sipro-mps/internal/ym/products"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/jackc/pgx/v5/pgxpool"
 | 
						"go.uber.org/fx"
 | 
				
			||||||
	"github.com/joho/godotenv"
 | 
					 | 
				
			||||||
	"google.golang.org/grpc"
 | 
					 | 
				
			||||||
	_ "google.golang.org/grpc/encoding/gzip"
 | 
						_ "google.golang.org/grpc/encoding/gzip"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func logMessage(level string, format string, a ...interface{}) {
 | 
					 | 
				
			||||||
	const (
 | 
					 | 
				
			||||||
		green  = "\033[32m"
 | 
					 | 
				
			||||||
		red    = "\033[31m"
 | 
					 | 
				
			||||||
		yellow = "\033[33m"
 | 
					 | 
				
			||||||
		blue   = "\033[34m"
 | 
					 | 
				
			||||||
		reset  = "\033[0m"
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
	switch level {
 | 
					 | 
				
			||||||
	case "info":
 | 
					 | 
				
			||||||
		fmt.Printf("%s✅ [INFO]%s %s %s\n", green, reset, fmt.Sprintf(format, a...), "ℹ️")
 | 
					 | 
				
			||||||
	case "error":
 | 
					 | 
				
			||||||
		fmt.Printf("%s❌ [ERROR]%s %s %s\n", red, reset, fmt.Sprintf(format, a...), "🚨")
 | 
					 | 
				
			||||||
	default:
 | 
					 | 
				
			||||||
		fmt.Printf("%s[LOG]%s %s\n", blue, reset, fmt.Sprintf(format, a...))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func createGrpcServer(pool *pgxpool.Pool) {
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	lis, err := net.Listen("tcp", ":8080")
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Printf("failed to listen: %v\n", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	grpcServer := grpc.NewServer()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	repo, err := marketplace.RegisterAdapterGRPC(grpcServer, pool)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Printf("failed to register gRPC server: %v\n", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	_, err = ozon.RegisterAdapterGRPC(grpcServer, *repo)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Printf("failed to register Ozon Products gRPC server: %v\n", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	_, err = wb.RegisterAdapterGRPC(grpcServer, *repo)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Printf("failed to register Wildberries Products gRPC server: %v\n", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	_, err = ym.RegisterAdapterGRPC(grpcServer, *repo)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		fmt.Printf("failed to register Yandex Market Products gRPC server: %v\n", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	fmt.Println("gRPC server registered successfully.")
 | 
					 | 
				
			||||||
	// Start serving gRPC requests
 | 
					 | 
				
			||||||
	fmt.Println("gRPC server is starting on port 8080...")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err := grpcServer.Serve(lis); err != nil {
 | 
					 | 
				
			||||||
		fmt.Printf("failed to serve: %v\n", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	fmt.Println("gRPC server created.")
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func initDotenv() error {
 | 
					 | 
				
			||||||
	err := godotenv.Load()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("error loading .env file: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	logMessage("info", "Dotenv file loaded successfully. 🌱")
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func initRedisClient(ctx context.Context) error {
 | 
					 | 
				
			||||||
	err := redis.InitClient(ctx)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("error initializing Redis client: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	//defer redis.CloseClient()
 | 
					 | 
				
			||||||
	logMessage("info", "Redis client initialized successfully. 🟥")
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func initRedisLocker() error {
 | 
					 | 
				
			||||||
	err := redis.InitLocker()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("error initializing Redis locker: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	logMessage("info", "Redis locker initialized successfully. 🟥")
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func main() {
 | 
					func main() {
 | 
				
			||||||
	err := initDotenv()
 | 
						fx.New(
 | 
				
			||||||
	if err != nil {
 | 
							config.Module,
 | 
				
			||||||
		logMessage("error", "Failed to load .env file: %v", err)
 | 
							redis.Module,
 | 
				
			||||||
	}
 | 
							db.Module,
 | 
				
			||||||
	logMessage("info", "Starting the SIPRO Marketplace Server... 🚀1")
 | 
							grpc.Module,
 | 
				
			||||||
	ctx := context.Background()
 | 
							marketplace.Module,
 | 
				
			||||||
 | 
							ozon_products.Module,
 | 
				
			||||||
	// Initializing the Redis client
 | 
						).Run()
 | 
				
			||||||
	err = initRedisClient(ctx)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		logMessage("error", "Failed to initialize Redis client: %v", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer redis.CloseClient()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Initializing the Redis locker
 | 
					 | 
				
			||||||
	err = initRedisLocker()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		logMessage("error", "Failed to initialize Redis locker: %v", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer redis.CloseLocker()
 | 
					 | 
				
			||||||
	cfg, err := config.LoadConfig()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		logMessage("error", "Failed to load configuration: %v", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	client.InitClient(*cfg.Redis)
 | 
					 | 
				
			||||||
	// Initializing pgx connection
 | 
					 | 
				
			||||||
	dbpool, err := pgxpool.New(ctx, os.Getenv("POSTGRES_URL"))
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		logMessage("error", "Failed to connect to PostgreSQL: %v", err)
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer dbpool.Close()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	createGrpcServer(dbpool)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							@@ -77,6 +77,8 @@ require (
 | 
				
			|||||||
	github.com/wasilibs/wazero-helpers v0.0.0-20240620070341-3dff1577cd52 // indirect
 | 
						github.com/wasilibs/wazero-helpers v0.0.0-20240620070341-3dff1577cd52 // indirect
 | 
				
			||||||
	go.opentelemetry.io/auto/sdk v1.1.0 // indirect
 | 
						go.opentelemetry.io/auto/sdk v1.1.0 // indirect
 | 
				
			||||||
	go.uber.org/atomic v1.11.0 // indirect
 | 
						go.uber.org/atomic v1.11.0 // indirect
 | 
				
			||||||
 | 
						go.uber.org/dig v1.19.0 // indirect
 | 
				
			||||||
 | 
						go.uber.org/fx v1.24.0 // indirect
 | 
				
			||||||
	go.uber.org/multierr v1.11.0 // indirect
 | 
						go.uber.org/multierr v1.11.0 // indirect
 | 
				
			||||||
	go.uber.org/zap v1.27.0 // indirect
 | 
						go.uber.org/zap v1.27.0 // indirect
 | 
				
			||||||
	golang.org/x/crypto v0.41.0 // indirect
 | 
						golang.org/x/crypto v0.41.0 // indirect
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							@@ -178,6 +178,10 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 | 
				
			|||||||
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 | 
					go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 | 
				
			||||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
 | 
					go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
 | 
				
			||||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
 | 
					go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
 | 
				
			||||||
 | 
					go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4=
 | 
				
			||||||
 | 
					go.uber.org/dig v1.19.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
 | 
				
			||||||
 | 
					go.uber.org/fx v1.24.0 h1:wE8mruvpg2kiiL1Vqd0CC+tr0/24XIB10Iwp2lLWzkg=
 | 
				
			||||||
 | 
					go.uber.org/fx v1.24.0/go.mod h1:AmDeGyS+ZARGKM4tlH4FY2Jr63VjbEDJHtqXTGP5hbo=
 | 
				
			||||||
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
 | 
					go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
 | 
				
			||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
 | 
					go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
 | 
				
			||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
 | 
					go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -7,9 +7,10 @@ type Config struct {
 | 
				
			|||||||
	Database *DatabaseConfig
 | 
						Database *DatabaseConfig
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func LoadConfig() (*Config, error) {
 | 
					func LoadConfig() (Config, error) {
 | 
				
			||||||
 | 
						println("Config loaded")
 | 
				
			||||||
	_ = godotenv.Load()
 | 
						_ = godotenv.Load()
 | 
				
			||||||
	redisConfig := LoadRedisConfig()
 | 
						redisConfig := LoadRedisConfig()
 | 
				
			||||||
	databaseConfig := LoadDatabaseConfig()
 | 
						databaseConfig := LoadDatabaseConfig()
 | 
				
			||||||
	return &Config{Redis: redisConfig, Database: databaseConfig}, nil
 | 
						return Config{Redis: redisConfig, Database: databaseConfig}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										7
									
								
								internal/config/fx.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								internal/config/fx.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,7 @@
 | 
				
			|||||||
 | 
					package config
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "go.uber.org/fx"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var Module = fx.Options(
 | 
				
			||||||
 | 
						fx.Provide(LoadConfig),
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
@@ -1,17 +1,26 @@
 | 
				
			|||||||
package db
 | 
					package db
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"database/sql"
 | 
						"context"
 | 
				
			||||||
 | 
						"sipro-mps/internal/config"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/jackc/pgx/v5/pgxpool"
 | 
				
			||||||
	_ "github.com/lib/pq"
 | 
						_ "github.com/lib/pq"
 | 
				
			||||||
 | 
						"go.uber.org/fx"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewConnection(dsn string) (*sql.DB, error) {
 | 
					func NewPgxPool(lc fx.Lifecycle, config config.Config) (*pgxpool.Pool, error) {
 | 
				
			||||||
	db, err := sql.Open("postgres", dsn)
 | 
						ctx := context.Background()
 | 
				
			||||||
 | 
						pool, err := pgxpool.New(ctx, config.Database.URL)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if err := db.Ping(); err != nil {
 | 
						lc.Append(fx.Hook{
 | 
				
			||||||
		return nil, err
 | 
							OnStop: func(ctx context.Context) error {
 | 
				
			||||||
	}
 | 
								pool.Close()
 | 
				
			||||||
	return db, nil
 | 
								return nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return pool, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										16
									
								
								internal/db/fx.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										16
									
								
								internal/db/fx.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,16 @@
 | 
				
			|||||||
 | 
					package db
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"sipro-mps/internal/marketplace/db"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"go.uber.org/fx"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var Module = fx.Options(
 | 
				
			||||||
 | 
						fx.Provide(
 | 
				
			||||||
 | 
							fx.Annotate(
 | 
				
			||||||
 | 
								NewPgxPool,
 | 
				
			||||||
 | 
								fx.As(new(db.DBTX)),
 | 
				
			||||||
 | 
							),
 | 
				
			||||||
 | 
						),
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
@@ -2,11 +2,11 @@ package marketplace
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						pb "sipro-mps/api/generated/v1/marketplace"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"google.golang.org/grpc"
 | 
						"google.golang.org/grpc"
 | 
				
			||||||
	"google.golang.org/grpc/codes"
 | 
						"google.golang.org/grpc/codes"
 | 
				
			||||||
	"google.golang.org/grpc/status"
 | 
						"google.golang.org/grpc/status"
 | 
				
			||||||
	pb "sipro-mps/api/generated/v1/marketplace"
 | 
					 | 
				
			||||||
	"sipro-mps/internal/marketplace/db"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// AdapterGRPC implements the gRPC server for the Marketplace service.
 | 
					// AdapterGRPC implements the gRPC server for the Marketplace service.
 | 
				
			||||||
@@ -20,15 +20,8 @@ func NewAdapterGRPC(repo Repository) *AdapterGRPC {
 | 
				
			|||||||
		repo: repo,
 | 
							repo: repo,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func RegisterAdapterGRPC(server *grpc.Server, conn db.DBTX) (*Repository, error) {
 | 
					func Register(server *grpc.Server, repo Repository) {
 | 
				
			||||||
	//conn, err := pgx.Connect(context.Background(), "postgresql://postgres:GjitkeYf%5Beq@/sipro?host=/run/postgresql")
 | 
						pb.RegisterMarketplaceServiceServer(server, NewAdapterGRPC(repo))
 | 
				
			||||||
	//if err != nil {
 | 
					 | 
				
			||||||
	//	return nil, err
 | 
					 | 
				
			||||||
	//}
 | 
					 | 
				
			||||||
	repo := NewDBRepository(conn)
 | 
					 | 
				
			||||||
	adapter := NewAdapterGRPC(repo)
 | 
					 | 
				
			||||||
	pb.RegisterMarketplaceServiceServer(server, adapter)
 | 
					 | 
				
			||||||
	return &repo, nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (g *AdapterGRPC) GetMarketplaceById(ctx context.Context, r *pb.GetMarketplaceByIdRequest) (*pb.Marketplace, error) {
 | 
					func (g *AdapterGRPC) GetMarketplaceById(ctx context.Context, r *pb.GetMarketplaceByIdRequest) (*pb.Marketplace, error) {
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										12
									
								
								internal/marketplace/fx.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								internal/marketplace/fx.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,12 @@
 | 
				
			|||||||
 | 
					package marketplace
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "go.uber.org/fx"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var Module = fx.Options(
 | 
				
			||||||
 | 
						fx.Provide(
 | 
				
			||||||
 | 
							NewDBRepository,
 | 
				
			||||||
 | 
						),
 | 
				
			||||||
 | 
						fx.Invoke(
 | 
				
			||||||
 | 
							Register,
 | 
				
			||||||
 | 
						),
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
@@ -2,13 +2,15 @@ package ozon
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"git.denco.store/fakz9/ozon-api-client/ozon"
 | 
					 | 
				
			||||||
	"github.com/tidwall/gjson"
 | 
					 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"sipro-mps/internal/marketplace"
 | 
						"sipro-mps/internal/marketplace"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"git.denco.store/fakz9/ozon-api-client/ozon"
 | 
				
			||||||
 | 
						"github.com/redis/rueidis"
 | 
				
			||||||
 | 
						"github.com/tidwall/gjson"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func GetClientFromMarketplace(mp *marketplace.Marketplace) (*ozon.Client, error) {
 | 
					func GetClientFromMarketplace(redis rueidis.Client, mp *marketplace.Marketplace) (*ozon.Client, error) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	authDataParsed := gjson.Parse(mp.AuthData)
 | 
						authDataParsed := gjson.Parse(mp.AuthData)
 | 
				
			||||||
	clientIdResult := authDataParsed.Get("clientId")
 | 
						clientIdResult := authDataParsed.Get("clientId")
 | 
				
			||||||
@@ -19,7 +21,7 @@ func GetClientFromMarketplace(mp *marketplace.Marketplace) (*ozon.Client, error)
 | 
				
			|||||||
	apiKey := apiKeyResult.String()
 | 
						apiKey := apiKeyResult.String()
 | 
				
			||||||
	clientId := clientIdResult.String()
 | 
						clientId := clientIdResult.String()
 | 
				
			||||||
	httpClient := &http.Client{
 | 
						httpClient := &http.Client{
 | 
				
			||||||
		Transport: NewRateLimitTransport(),
 | 
							Transport: NewRateLimitTransport(redis),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	opts := []ozon.ClientOption{
 | 
						opts := []ozon.ClientOption{
 | 
				
			||||||
		ozon.WithAPIKey(apiKey),
 | 
							ozon.WithAPIKey(apiKey),
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -3,10 +3,10 @@ package products
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						pb "sipro-mps/api/generated/v1/ozon/products"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/samber/lo"
 | 
						"github.com/samber/lo"
 | 
				
			||||||
	"google.golang.org/grpc"
 | 
						"google.golang.org/grpc"
 | 
				
			||||||
	pb "sipro-mps/api/generated/v1/ozon/products"
 | 
					 | 
				
			||||||
	"sipro-mps/internal/marketplace"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type AdapterGRPC struct {
 | 
					type AdapterGRPC struct {
 | 
				
			||||||
@@ -20,13 +20,10 @@ func NewAdapterGRPC(repo Repository) *AdapterGRPC {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// RegisterAdapterGRPC registers the gRPC server for the Products service.
 | 
					func Register(server *grpc.Server, repo Repository) {
 | 
				
			||||||
func RegisterAdapterGRPC(server *grpc.Server, marketplaceRepo marketplace.Repository) (repo *Repository, err error) {
 | 
						pb.RegisterProductsServiceServer(server, NewAdapterGRPC(repo))
 | 
				
			||||||
	apiRepo := NewAPIRepository(marketplaceRepo)
 | 
					 | 
				
			||||||
	adapter := NewAdapterGRPC(apiRepo)
 | 
					 | 
				
			||||||
	pb.RegisterProductsServiceServer(server, adapter)
 | 
					 | 
				
			||||||
	return &apiRepo, nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error {
 | 
					func (g *AdapterGRPC) GetListOfProducts(req *pb.GetListOfProductsRequest, stream pb.ProductsService_GetListOfProductsServer) error {
 | 
				
			||||||
	ctx := stream.Context()
 | 
						ctx := stream.Context()
 | 
				
			||||||
	fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId)
 | 
						fmt.Printf("GetListOfProducts called with req: %+v\n", req.MarketplaceId)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										8
									
								
								internal/ozon/products/fx.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										8
									
								
								internal/ozon/products/fx.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,8 @@
 | 
				
			|||||||
 | 
					package products
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "go.uber.org/fx"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var Module = fx.Options(
 | 
				
			||||||
 | 
						fx.Provide(NewAPIRepository),
 | 
				
			||||||
 | 
						fx.Invoke(Register),
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
@@ -7,26 +7,28 @@ import (
 | 
				
			|||||||
	"sipro-mps/internal/marketplace"
 | 
						"sipro-mps/internal/marketplace"
 | 
				
			||||||
	"sipro-mps/internal/ozon"
 | 
						"sipro-mps/internal/ozon"
 | 
				
			||||||
	"sipro-mps/internal/ozon/products/mapping/generated"
 | 
						"sipro-mps/internal/ozon/products/mapping/generated"
 | 
				
			||||||
	"sipro-mps/internal/redis"
 | 
					 | 
				
			||||||
	"sipro-mps/pkg/utils"
 | 
						"sipro-mps/pkg/utils"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	api "git.denco.store/fakz9/ozon-api-client/ozon"
 | 
						api "git.denco.store/fakz9/ozon-api-client/ozon"
 | 
				
			||||||
 | 
						"github.com/redis/rueidis"
 | 
				
			||||||
	"github.com/samber/lo"
 | 
						"github.com/samber/lo"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type apiRepository struct {
 | 
					type apiRepository struct {
 | 
				
			||||||
	marketplaceRepository marketplace.Repository
 | 
						marketplaceRepository marketplace.Repository
 | 
				
			||||||
 | 
						redis                 rueidis.Client
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func GetProductsKey(identifier string) string {
 | 
					func GetProductsKey(identifier string) string {
 | 
				
			||||||
	return fmt.Sprintf("ozon:products:%s:compressed", identifier)
 | 
						return fmt.Sprintf("ozon:products:%s:compressed", identifier)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewAPIRepository(marketplaceRepository marketplace.Repository) Repository {
 | 
					func NewAPIRepository(marketplaceRepository marketplace.Repository, redis rueidis.Client) Repository {
 | 
				
			||||||
	return &apiRepository{
 | 
						return &apiRepository{
 | 
				
			||||||
		marketplaceRepository: marketplaceRepository,
 | 
							marketplaceRepository: marketplaceRepository,
 | 
				
			||||||
 | 
							redis:                 redis,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -91,7 +93,7 @@ func (a *apiRepository) GetAllProducts(ctx context.Context, marketplaceId int) (
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	clientFromMarketplace, err := ozon.GetClientFromMarketplace(mp)
 | 
						clientFromMarketplace, err := ozon.GetClientFromMarketplace(a.redis, mp)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -115,7 +117,7 @@ func (a *apiRepository) StreamAllProducts(ctx context.Context, marketplaceId int
 | 
				
			|||||||
		errChan <- err
 | 
							errChan <- err
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	clientFromMarketplace, err := ozon.GetClientFromMarketplace(mp)
 | 
						clientFromMarketplace, err := ozon.GetClientFromMarketplace(a.redis, mp)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		errChan <- err
 | 
							errChan <- err
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
@@ -128,31 +130,31 @@ func (a *apiRepository) StreamAllProducts(ctx context.Context, marketplaceId int
 | 
				
			|||||||
func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []PbProduct, errChan chan<- error) {
 | 
					func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId int, resultChan chan<- []PbProduct, errChan chan<- error) {
 | 
				
			||||||
	defer close(resultChan)
 | 
						defer close(resultChan)
 | 
				
			||||||
	defer close(errChan)
 | 
						defer close(errChan)
 | 
				
			||||||
	mp, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
 | 
						_, err := a.marketplaceRepository.GetMarketplaceByID(ctx, marketplaceId)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		errChan <- err
 | 
							errChan <- err
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	identifier, err := mp.GetIdentifier()
 | 
						//identifier, err := mp.GetIdentifier()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
 | 
							errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	key := GetProductsKey(identifier)
 | 
						//key := GetProductsKey(identifier)
 | 
				
			||||||
	var cachedMessage pb.GetListOfProductsResponse
 | 
						var cachedMessage pb.GetListOfProductsResponse
 | 
				
			||||||
	err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
 | 
						//err = a.redis.ReadProtoMessage(ctx, key, &cachedMessage)
 | 
				
			||||||
	if err == nil && len(cachedMessage.Products) > 0 {
 | 
						if err == nil && len(cachedMessage.Products) > 0 {
 | 
				
			||||||
		resultChan <- utils.DerefSlice(cachedMessage.Products)
 | 
							resultChan <- utils.DerefSlice(cachedMessage.Products)
 | 
				
			||||||
		//_ = client.EnqueueFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId)
 | 
							//_ = client.EnqueueFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	locker := *redis.Locker
 | 
						//locker := *redis.Locker
 | 
				
			||||||
	_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ozon:products:marketplace:%s:lock", key))
 | 
						//_, cancel, err := locker.TryWithContext(ctx, fmt.Sprintf("ozon:products:marketplace:%s:lock", key))
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	defer cancel()
 | 
						//defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	innerResultChan := make(chan []OzonProduct)
 | 
						innerResultChan := make(chan []OzonProduct)
 | 
				
			||||||
	innerErrChan := make(chan error)
 | 
						innerErrChan := make(chan error)
 | 
				
			||||||
@@ -164,8 +166,8 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
 | 
				
			|||||||
		if len(allProducts) == 0 {
 | 
							if len(allProducts) == 0 {
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		message := pb.GetListOfProductsResponse{Products: utils.ToPtrs(allProducts)}
 | 
							//message := pb.GetListOfProductsResponse{Products: utils.ToPtrs(allProducts)}
 | 
				
			||||||
		_ = redis.WriteProtoMessage(ctx, key, &message)
 | 
							//_ = redis.WriteProtoMessage(ctx, key, &message)
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
@@ -202,23 +204,23 @@ func (a *apiRepository) StreamProductAttributesCache(ctx context.Context, market
 | 
				
			|||||||
		errChan <- err
 | 
							errChan <- err
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	identifier, err := mp.GetIdentifier()
 | 
						//identifier, err := mp.GetIdentifier()
 | 
				
			||||||
	if err != nil {
 | 
						//if err != nil {
 | 
				
			||||||
		errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
 | 
						//	errChan <- fmt.Errorf("getting marketplace identifier: %w", err)
 | 
				
			||||||
		return
 | 
						//	return
 | 
				
			||||||
	}
 | 
						//}
 | 
				
			||||||
	ozonClient, err := ozon.GetClientFromMarketplace(mp)
 | 
						ozonClient, err := ozon.GetClientFromMarketplace(a.redis, mp)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		errChan <- err
 | 
							errChan <- err
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	key := fmt.Sprintf("ozon:product_attributes:%s:lock", identifier)
 | 
						//key := fmt.Sprintf("ozon:product_attributes:%s:lock", identifier)
 | 
				
			||||||
	locker := *redis.Locker
 | 
						//locker := *redis.Locker
 | 
				
			||||||
	_, cancel, err := locker.WithContext(ctx, key)
 | 
						//_, cancel, err := locker.WithContext(ctx, key)
 | 
				
			||||||
	if err != nil {
 | 
						//if err != nil {
 | 
				
			||||||
		return
 | 
						//	return
 | 
				
			||||||
	}
 | 
						//}
 | 
				
			||||||
	defer cancel()
 | 
						//defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	converter := generated.ConverterImpl{}
 | 
						converter := generated.ConverterImpl{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -253,24 +255,24 @@ func (a *apiRepository) DeleteProducts(ctx context.Context, marketplaceId int, i
 | 
				
			|||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	identifier, err := mp.GetIdentifier()
 | 
						//identifier, err := mp.GetIdentifier()
 | 
				
			||||||
	if err != nil {
 | 
						//if err != nil {
 | 
				
			||||||
		return nil, fmt.Errorf("getting marketplace identifier: %w", err)
 | 
						//	return nil, fmt.Errorf("getting marketplace identifier: %w", err)
 | 
				
			||||||
	}
 | 
						//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ozonClient, err := ozon.GetClientFromMarketplace(mp)
 | 
						ozonClient, err := ozon.GetClientFromMarketplace(a.redis, mp)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	key := fmt.Sprintf("ozon:products_delete:%s:lock", identifier)
 | 
						//key := fmt.Sprintf("ozon:products_delete:%s:lock", identifier)
 | 
				
			||||||
	locker := *redis.Locker
 | 
						//locker := *redis.Locker
 | 
				
			||||||
	_, cancel, err := locker.WithContext(ctx, key)
 | 
						//_, cancel, err := locker.WithContext(ctx, key)
 | 
				
			||||||
	if err != nil {
 | 
						//if err != nil {
 | 
				
			||||||
		return nil, err
 | 
						//	return nil, err
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
	}
 | 
						//}
 | 
				
			||||||
	defer cancel()
 | 
						//defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Step 1: map the items into a slice
 | 
						// Step 1: map the items into a slice
 | 
				
			||||||
	mapped := lo.Map(items, func(item *PbDeleteProductRequestItem, _ int) *PbDeleteProductResponseItem {
 | 
						mapped := lo.Map(items, func(item *PbDeleteProductRequestItem, _ int) *PbDeleteProductResponseItem {
 | 
				
			||||||
@@ -322,24 +324,24 @@ func (a *apiRepository) CreateOrUpdateProducts(ctx context.Context, marketplaceI
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	ozonClient, err := ozon.GetClientFromMarketplace(mp)
 | 
						ozonClient, err := ozon.GetClientFromMarketplace(a.redis, mp)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	identifier, err := mp.GetIdentifier()
 | 
						//identifier, err := mp.GetIdentifier()
 | 
				
			||||||
	if err != nil {
 | 
						//if err != nil {
 | 
				
			||||||
		return nil, fmt.Errorf("getting marketplace identifier: %w", err)
 | 
						//	return nil, fmt.Errorf("getting marketplace identifier: %w", err)
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	}
 | 
						//key := fmt.Sprintf("ozon:products_create_update:%s:lock", identifier)
 | 
				
			||||||
 | 
						//locker := *redis.Locker
 | 
				
			||||||
	key := fmt.Sprintf("ozon:products_create_update:%s:lock", identifier)
 | 
						//_, cancel, err := locker.WithContext(ctx, key)
 | 
				
			||||||
	locker := *redis.Locker
 | 
						//if err != nil {
 | 
				
			||||||
	_, cancel, err := locker.WithContext(ctx, key)
 | 
						//	return nil, err
 | 
				
			||||||
	if err != nil {
 | 
						//
 | 
				
			||||||
		return nil, err
 | 
						//}
 | 
				
			||||||
 | 
						//defer cancel()
 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer cancel()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	converter := generated.ConverterImpl{}
 | 
						converter := generated.ConverterImpl{}
 | 
				
			||||||
	pageSize := 100
 | 
						pageSize := 100
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2,10 +2,10 @@ package ozon
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"github.com/redis/rueidis"
 | 
					 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"sipro-mps/internal/redis"
 | 
					 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/redis/rueidis"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
@@ -40,6 +40,7 @@ var (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type RateLimitTransport struct {
 | 
					type RateLimitTransport struct {
 | 
				
			||||||
	http.RoundTripper
 | 
						http.RoundTripper
 | 
				
			||||||
 | 
						redis rueidis.Client
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
 | 
					func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
 | 
				
			||||||
@@ -47,7 +48,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error
 | 
				
			|||||||
	clientId := req.Header.Get("Client-Id")
 | 
						clientId := req.Header.Get("Client-Id")
 | 
				
			||||||
	now := time.Now().UnixNano()
 | 
						now := time.Now().UnixNano()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	waitTime, err := rateLimiterScript.Exec(ctx, *redis.Client, []string{clientId}, []string{
 | 
						waitTime, err := rateLimiterScript.Exec(ctx, t.redis, []string{clientId}, []string{
 | 
				
			||||||
		fmt.Sprintf("%d", now),
 | 
							fmt.Sprintf("%d", now),
 | 
				
			||||||
		fmt.Sprintf("%d", int64(windowSize)),
 | 
							fmt.Sprintf("%d", int64(windowSize)),
 | 
				
			||||||
		fmt.Sprintf("%d", rps),
 | 
							fmt.Sprintf("%d", rps),
 | 
				
			||||||
@@ -61,7 +62,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return t.RoundTripper.RoundTrip(req)
 | 
						return t.RoundTripper.RoundTrip(req)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func NewRateLimitTransport() *RateLimitTransport {
 | 
					func NewRateLimitTransport(redis rueidis.Client) *RateLimitTransport {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return &RateLimitTransport{RoundTripper: http.DefaultTransport}
 | 
						return &RateLimitTransport{RoundTripper: http.DefaultTransport, redis: redis}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,132 +1,126 @@
 | 
				
			|||||||
package redis
 | 
					package redis
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"bytes"
 | 
					 | 
				
			||||||
	"compress/flate"
 | 
					 | 
				
			||||||
	"compress/zlib"
 | 
					 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"sipro-mps/internal/config"
 | 
				
			||||||
	"io"
 | 
					 | 
				
			||||||
	"os"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/protobuf/proto"
 | 
					 | 
				
			||||||
	"github.com/redis/rueidis"
 | 
						"github.com/redis/rueidis"
 | 
				
			||||||
 | 
						"go.uber.org/fx"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var Client *rueidis.Client
 | 
					func NewRedisClient(lc fx.Lifecycle, config config.Config) (rueidis.Client, error) {
 | 
				
			||||||
 | 
					 | 
				
			||||||
func InitClient(ctx context.Context) error {
 | 
					 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
	host := os.Getenv("REDIS_HOST")
 | 
						host := config.Redis.Host
 | 
				
			||||||
	//host := "redis"
 | 
						port := config.Redis.Port
 | 
				
			||||||
	port := os.Getenv("REDIS_PORT")
 | 
						password := config.Redis.Password
 | 
				
			||||||
	password := os.Getenv("REDIS_PASSWORD")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	client, err := rueidis.NewClient(rueidis.ClientOption{
 | 
						client, err := rueidis.NewClient(rueidis.ClientOption{
 | 
				
			||||||
		InitAddress: []string{host + ":" + port},
 | 
							InitAddress: []string{host + ":" + port},
 | 
				
			||||||
		Password:    password,
 | 
							Password:    password,
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	err = client.Do(ctx, client.B().Ping().Build()).Error()
 | 
						lc.Append(fx.Hook{
 | 
				
			||||||
	if err != nil {
 | 
							OnStart: func(ctx context.Context) error {
 | 
				
			||||||
		return err
 | 
								return client.Do(ctx, client.B().Ping().Build()).Error()
 | 
				
			||||||
	}
 | 
							},
 | 
				
			||||||
	Client = &client
 | 
							OnStop: func(ctx context.Context) error {
 | 
				
			||||||
 | 
								client.Close()
 | 
				
			||||||
			return nil
 | 
								return nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						return client, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func CloseClient() {
 | 
					//func CloseClient() {
 | 
				
			||||||
	if Client != nil {
 | 
					//	if Client != nil {
 | 
				
			||||||
		(*Client).Close()
 | 
					//		(*Client).Close()
 | 
				
			||||||
	}
 | 
					//	}
 | 
				
			||||||
}
 | 
					//}
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
// WriteProtoMessage compresses and writes a protobuf message to Redis
 | 
					//// WriteProtoMessage compresses and writes a protobuf message to Redis
 | 
				
			||||||
func WriteProtoMessage(ctx context.Context, key string, message proto.Message, ttl ...time.Duration) error {
 | 
					//func WriteProtoMessage(ctx context.Context, key string, message proto.Message, ttl ...time.Duration) error {
 | 
				
			||||||
	if Client == nil {
 | 
					//	if Client == nil {
 | 
				
			||||||
		return fmt.Errorf("redis client not initialized")
 | 
					//		return fmt.Errorf("redis client not initialized")
 | 
				
			||||||
	}
 | 
					//	}
 | 
				
			||||||
	if message == nil {
 | 
					//	if message == nil {
 | 
				
			||||||
		return fmt.Errorf("message is nil")
 | 
					//		return fmt.Errorf("message is nil")
 | 
				
			||||||
	}
 | 
					//	}
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
	// Marshal protobuf message
 | 
					//	// Marshal protobuf message
 | 
				
			||||||
	bytesMessage, err := proto.Marshal(message)
 | 
					//	bytesMessage, err := proto.Marshal(message)
 | 
				
			||||||
	if err != nil {
 | 
					//	if err != nil {
 | 
				
			||||||
		return fmt.Errorf("failed to marshal proto message: %w", err)
 | 
					//		return fmt.Errorf("failed to marshal proto message: %w", err)
 | 
				
			||||||
	}
 | 
					//	}
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
	// Compress with zlib
 | 
					//	// Compress with zlib
 | 
				
			||||||
	var buf bytes.Buffer
 | 
					//	var buf bytes.Buffer
 | 
				
			||||||
	w, err := zlib.NewWriterLevel(&buf, flate.BestCompression)
 | 
					//	w, err := zlib.NewWriterLevel(&buf, flate.BestCompression)
 | 
				
			||||||
	if err != nil {
 | 
					//	if err != nil {
 | 
				
			||||||
		return fmt.Errorf("failed to create zlib writer: %w", err)
 | 
					//		return fmt.Errorf("failed to create zlib writer: %w", err)
 | 
				
			||||||
	}
 | 
					//	}
 | 
				
			||||||
	defer w.Close() // гарантированное закрытие
 | 
					//	defer w.Close() // гарантированное закрытие
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
	if _, err := w.Write(bytesMessage); err != nil {
 | 
					//	if _, err := w.Write(bytesMessage); err != nil {
 | 
				
			||||||
		return fmt.Errorf("failed to write to zlib writer: %w", err)
 | 
					//		return fmt.Errorf("failed to write to zlib writer: %w", err)
 | 
				
			||||||
	}
 | 
					//	}
 | 
				
			||||||
	if err := w.Close(); err != nil { // финализируем сжатие
 | 
					//	if err := w.Close(); err != nil { // финализируем сжатие
 | 
				
			||||||
		return fmt.Errorf("failed to close zlib writer: %w", err)
 | 
					//		return fmt.Errorf("failed to close zlib writer: %w", err)
 | 
				
			||||||
	}
 | 
					//	}
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
	var ttlDuration time.Duration
 | 
					//	//var ttlDuration time.Duration
 | 
				
			||||||
	if len(ttl) > 0 {
 | 
					//	//if len(ttl) > 0 {
 | 
				
			||||||
		ttlDuration = ttl[0]
 | 
					//	//	ttlDuration = ttl[0]
 | 
				
			||||||
	} else {
 | 
					//	//} else {
 | 
				
			||||||
		ttlDuration = 3 * time.Hour // Default TTL of 24 hours
 | 
					//	//	ttlDuration = 3 * time.Hour // Default TTL of 24 hours
 | 
				
			||||||
	}
 | 
					//	//}
 | 
				
			||||||
	// Write to Redis
 | 
					//	// Write to Redis
 | 
				
			||||||
	if err := (*Client).Do(ctx, (*Client).B().
 | 
					//	if err := (*Client).Do(ctx, (*Client).B().
 | 
				
			||||||
		Set().
 | 
					//		Set().
 | 
				
			||||||
		Key(key).
 | 
					//		Key(key).
 | 
				
			||||||
		Value(rueidis.BinaryString(buf.Bytes())).
 | 
					//		Value(rueidis.BinaryString(buf.Bytes())).
 | 
				
			||||||
		Ex(ttlDuration).
 | 
					//		Build()).
 | 
				
			||||||
		Build()).
 | 
					//		Error(); err != nil {
 | 
				
			||||||
		Error(); err != nil {
 | 
					//		return fmt.Errorf("failed to write compressed data to Redis: %w", err)
 | 
				
			||||||
		return fmt.Errorf("failed to write compressed data to Redis: %w", err)
 | 
					//	}
 | 
				
			||||||
	}
 | 
					//	return nil
 | 
				
			||||||
	return nil
 | 
					//}
 | 
				
			||||||
}
 | 
					//
 | 
				
			||||||
 | 
					//// ReadProtoMessage reads and decompresses a protobuf message from Redis
 | 
				
			||||||
// ReadProtoMessage reads and decompresses a protobuf message from Redis
 | 
					//func ReadProtoMessage(ctx context.Context, key string, message proto.Message) error {
 | 
				
			||||||
func ReadProtoMessage(ctx context.Context, key string, message proto.Message) error {
 | 
					//	if Client == nil {
 | 
				
			||||||
	if Client == nil {
 | 
					//		return fmt.Errorf("redis client not initialized")
 | 
				
			||||||
		return fmt.Errorf("redis client not initialized")
 | 
					//	}
 | 
				
			||||||
	}
 | 
					//	if message == nil {
 | 
				
			||||||
	if message == nil {
 | 
					//		return fmt.Errorf("message is nil")
 | 
				
			||||||
		return fmt.Errorf("message is nil")
 | 
					//	}
 | 
				
			||||||
	}
 | 
					//
 | 
				
			||||||
 | 
					//	// Get bytes from Redis
 | 
				
			||||||
	// Get bytes from Redis
 | 
					//	resp, err := (*Client).Do(ctx, (*Client).B().Get().Key(key).Build()).AsBytes()
 | 
				
			||||||
	resp, err := (*Client).Do(ctx, (*Client).B().Get().Key(key).Build()).AsBytes()
 | 
					//	if err != nil {
 | 
				
			||||||
	if err != nil {
 | 
					//		return fmt.Errorf("failed to read data from Redis: %w", err)
 | 
				
			||||||
		return fmt.Errorf("failed to read data from Redis: %w", err)
 | 
					//	}
 | 
				
			||||||
	}
 | 
					//	if resp == nil {
 | 
				
			||||||
	if resp == nil {
 | 
					//		return fmt.Errorf("no data found for key: %s", key)
 | 
				
			||||||
		return fmt.Errorf("no data found for key: %s", key)
 | 
					//	}
 | 
				
			||||||
	}
 | 
					//
 | 
				
			||||||
 | 
					//	// Decompress
 | 
				
			||||||
	// Decompress
 | 
					//	reader, err := zlib.NewReader(bytes.NewReader(resp))
 | 
				
			||||||
	reader, err := zlib.NewReader(bytes.NewReader(resp))
 | 
					//	if err != nil {
 | 
				
			||||||
	if err != nil {
 | 
					//		return fmt.Errorf("failed to create zlib reader: %w", err)
 | 
				
			||||||
		return fmt.Errorf("failed to create zlib reader: %w", err)
 | 
					//	}
 | 
				
			||||||
	}
 | 
					//	defer reader.Close()
 | 
				
			||||||
	defer reader.Close()
 | 
					//
 | 
				
			||||||
 | 
					//	decompressed, err := io.ReadAll(reader)
 | 
				
			||||||
	decompressed, err := io.ReadAll(reader)
 | 
					//	if err != nil {
 | 
				
			||||||
	if err != nil {
 | 
					//		return fmt.Errorf("failed to decompress data: %w", err)
 | 
				
			||||||
		return fmt.Errorf("failed to decompress data: %w", err)
 | 
					//	}
 | 
				
			||||||
	}
 | 
					//
 | 
				
			||||||
 | 
					//	// Unmarshal protobuf
 | 
				
			||||||
	// Unmarshal protobuf
 | 
					//	if err := proto.Unmarshal(decompressed, message); err != nil {
 | 
				
			||||||
	if err := proto.Unmarshal(decompressed, message); err != nil {
 | 
					//		return fmt.Errorf("failed to unmarshal proto message: %w", err)
 | 
				
			||||||
		return fmt.Errorf("failed to unmarshal proto message: %w", err)
 | 
					//	}
 | 
				
			||||||
	}
 | 
					//
 | 
				
			||||||
 | 
					//	return nil
 | 
				
			||||||
	return nil
 | 
					//}
 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										8
									
								
								internal/redis/fx.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										8
									
								
								internal/redis/fx.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,8 @@
 | 
				
			|||||||
 | 
					package redis
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "go.uber.org/fx"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var Module = fx.Options(
 | 
				
			||||||
 | 
						fx.Provide(NewRedisClient),
 | 
				
			||||||
 | 
						fx.Provide(NewRedisLocker),
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
@@ -1,30 +1,29 @@
 | 
				
			|||||||
package redis
 | 
					package redis
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/redis/rueidis"
 | 
						"github.com/redis/rueidis"
 | 
				
			||||||
	"github.com/redis/rueidis/rueidislock"
 | 
						"github.com/redis/rueidis/rueidislock"
 | 
				
			||||||
 | 
						"go.uber.org/fx"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var Locker *rueidislock.Locker
 | 
					func NewRedisLocker(lc fx.Lifecycle) (rueidislock.Locker, error) {
 | 
				
			||||||
 | 
					 | 
				
			||||||
func InitLocker() error {
 | 
					 | 
				
			||||||
	redisAddr := os.Getenv("REDIS_ADDR")
 | 
						redisAddr := os.Getenv("REDIS_ADDR")
 | 
				
			||||||
	password := os.Getenv("REDIS_PASSWORD")
 | 
						password := os.Getenv("REDIS_PASSWORD")
 | 
				
			||||||
	locker, err := rueidislock.NewLocker(rueidislock.LockerOption{
 | 
						locker, err := rueidislock.NewLocker(rueidislock.LockerOption{
 | 
				
			||||||
		ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
 | 
							ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	Locker = &locker
 | 
						lc.Append(fx.Hook{
 | 
				
			||||||
	return nil
 | 
							OnStop: func(_ context.Context) error {
 | 
				
			||||||
}
 | 
								locker.Close()
 | 
				
			||||||
func CloseLocker() {
 | 
					 | 
				
			||||||
	if Locker != nil {
 | 
					 | 
				
			||||||
		(*Locker).Close()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	Locker = nil
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						return locker, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										33
									
								
								internal/redis/utils.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								internal/redis/utils.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,33 @@
 | 
				
			|||||||
 | 
					package redis
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//func WriteString(ctx context.Context, key string, value string, ttl ...time.Duration) error {
 | 
				
			||||||
 | 
					//	if Client == nil {
 | 
				
			||||||
 | 
					//		return rueidis.Nil
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	var expiration time.Duration
 | 
				
			||||||
 | 
					//	if len(ttl) > 0 {
 | 
				
			||||||
 | 
					//		expiration = ttl[0]
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	return (*Client).Do(ctx, (*Client).B().
 | 
				
			||||||
 | 
					//		Set().
 | 
				
			||||||
 | 
					//		Key(key).
 | 
				
			||||||
 | 
					//		Value(value).
 | 
				
			||||||
 | 
					//		Ex(expiration).
 | 
				
			||||||
 | 
					//		Build()).Error()
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//func ReadString(ctx context.Context, key string) (string, error) {
 | 
				
			||||||
 | 
					//	if Client == nil {
 | 
				
			||||||
 | 
					//		return "", rueidis.Nil
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	resp := (*Client).Do(ctx, (*Client).B().
 | 
				
			||||||
 | 
					//		Get().
 | 
				
			||||||
 | 
					//		Key(key).
 | 
				
			||||||
 | 
					//		Build())
 | 
				
			||||||
 | 
					//	if resp.Error() != nil {
 | 
				
			||||||
 | 
					//		return "", resp.Error()
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	return resp.ToString()
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
							
								
								
									
										7
									
								
								internal/transport/grpc/fx.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								internal/transport/grpc/fx.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,7 @@
 | 
				
			|||||||
 | 
					package grpc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "go.uber.org/fx"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var Module = fx.Options(
 | 
				
			||||||
 | 
						fx.Provide(NewGrpcServer),
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
							
								
								
									
										29
									
								
								internal/transport/grpc/server.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										29
									
								
								internal/transport/grpc/server.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,29 @@
 | 
				
			|||||||
 | 
					package grpc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"net"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"go.uber.org/fx"
 | 
				
			||||||
 | 
						"google.golang.org/grpc"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewGrpcServer(lc fx.Lifecycle) *grpc.Server {
 | 
				
			||||||
 | 
						server := grpc.NewServer()
 | 
				
			||||||
 | 
						lc.Append(fx.Hook{
 | 
				
			||||||
 | 
							OnStart: func(ctx context.Context) error {
 | 
				
			||||||
 | 
								lis, err := net.Listen("tcp", ":8080")
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return err
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								println("Running server on :8080")
 | 
				
			||||||
 | 
								go server.Serve(lis)
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							OnStop: func(ctx context.Context) error {
 | 
				
			||||||
 | 
								server.GracefulStop()
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						return server
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -64,6 +64,7 @@ end
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type RateLimitTransport struct {
 | 
					type RateLimitTransport struct {
 | 
				
			||||||
	http.RoundTripper
 | 
						http.RoundTripper
 | 
				
			||||||
 | 
						redis rueidis.Client
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
 | 
					func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
 | 
				
			||||||
@@ -84,7 +85,7 @@ func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error
 | 
				
			|||||||
		return nil, fmt.Errorf("sellerId is required in JWT claims")
 | 
							return nil, fmt.Errorf("sellerId is required in JWT claims")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	now := time.Now().UnixMilli()
 | 
						now := time.Now().UnixMilli()
 | 
				
			||||||
	client := *redis.Client
 | 
						client := t.redis
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	waitTime, err := tokenBucketScript.Exec(ctx, client, []string{sellerId}, []string{
 | 
						waitTime, err := tokenBucketScript.Exec(ctx, client, []string{sellerId}, []string{
 | 
				
			||||||
		fmt.Sprintf("%d", now),
 | 
							fmt.Sprintf("%d", now),
 | 
				
			||||||
@@ -113,7 +114,7 @@ func SyncRateLimitRemaining(ctx context.Context, sellerId string, remaining int)
 | 
				
			|||||||
		return fmt.Errorf("invalid sellerId or remaining")
 | 
							return fmt.Errorf("invalid sellerId or remaining")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	now := time.Now().UnixMilli()
 | 
						now := time.Now().UnixMilli()
 | 
				
			||||||
	client := *redis.Client
 | 
						client :=
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	cmds := []rueidis.Completed{
 | 
						cmds := []rueidis.Completed{
 | 
				
			||||||
		client.B().Set().Key(sellerId + ":capacity").Value(fmt.Sprintf("%d", defaultBucketCapacity)).Ex(time.Minute).Build(),
 | 
							client.B().Set().Key(sellerId + ":capacity").Value(fmt.Sprintf("%d", defaultBucketCapacity)).Ex(time.Minute).Build(),
 | 
				
			||||||
@@ -167,6 +168,6 @@ func SetRateLimitRetry(ctx context.Context, sellerId string, retrySeconds int, l
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewRateLimitTransport() *RateLimitTransport {
 | 
					func NewRateLimitTransport(client rueidis.Client) *RateLimitTransport {
 | 
				
			||||||
	return &RateLimitTransport{RoundTripper: http.DefaultTransport}
 | 
						return &RateLimitTransport{RoundTripper: http.DefaultTransport}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user