package aaa
import (
"fmt"
"log/slog"
"os"
"time"
"github.com/golang-jwt/jwt/v5"
)
const secretKey = "something secret here" // token sign key
const adminRole = "superuser" // token subject
// Authentication, Authorization, Accounting
type AAA struct {
users map[string]string
tokenTTL time.Duration
log *slog.Logger
}
func New(tokenTTL time.Duration, log *slog.Logger) (AAA, error) {
const adminUser = "ADMIN_USER"
const adminPass = "ADMIN_PASSWORD"
user, ok := os.LookupEnv(adminUser)
if !ok {
return AAA{}, fmt.Errorf("could not get admin user from enviroment")
}
password, ok := os.LookupEnv(adminPass)
if !ok {
return AAA{}, fmt.Errorf("could not get admin password from enviroment")
}
return AAA{
users: map[string]string{user: password},
tokenTTL: tokenTTL,
log: log,
}, nil
}
func (a AAA) Login(name, password string) (string, error) {
expectedPass, ok := a.users[name]
if !ok || expectedPass != password {
return "", fmt.Errorf("invalid credentials")
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.RegisteredClaims{
Subject: adminRole,
ExpiresAt: jwt.NewNumericDate(time.Now().Add(a.tokenTTL)),
})
return token.SignedString([]byte(secretKey))
}
const userRole = "user"
func (a AAA) LoginFromDB(name, password string, checkPassword func(name, password string) error) (string, error) {
if err := checkPassword(name, password); err != nil {
return "", fmt.Errorf("invalid credentials")
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.RegisteredClaims{
Subject: userRole,
ExpiresAt: jwt.NewNumericDate(time.Now().Add(a.tokenTTL)),
})
return token.SignedString([]byte(secretKey))
}
func (a AAA) Verify(tokenString string) error {
return a.verifyRoles(tokenString, adminRole)
}
func (a AAA) VerifyAny(tokenString string) error {
return a.verifyRoles(tokenString, adminRole, userRole)
}
func (a AAA) verifyRoles(tokenString string, roles ...string) error {
token, err := jwt.ParseWithClaims(tokenString, &jwt.RegisteredClaims{}, func(t *jwt.Token) (any, error) {
if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", t.Header["alg"])
}
return []byte(secretKey), nil
})
if err != nil {
return fmt.Errorf("invalid token: %w", err)
}
claims, ok := token.Claims.(*jwt.RegisteredClaims)
if !ok || !token.Valid {
return fmt.Errorf("invalid token claims")
}
for _, role := range roles {
if claims.Subject == role {
return nil
}
}
return fmt.Errorf("insufficient permissions")
}
package db
import (
"context"
"embed"
"fmt"
"log/slog"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/pgx"
"github.com/golang-migrate/migrate/v4/source/iofs"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/jmoiron/sqlx"
"golang.org/x/crypto/bcrypt"
)
//go:embed migrations/*.sql
var migrationFiles embed.FS
type DB struct {
log *slog.Logger
conn *sqlx.DB
}
func New(log *slog.Logger, address string) (*DB, error) {
conn, err := sqlx.Connect("pgx", address)
if err != nil {
log.Error("connection problem", "address", address, "error", err)
return nil, err
}
return &DB{log: log, conn: conn}, nil
}
func (db *DB) Migrate() error {
db.log.Debug("running migration")
files, err := iofs.New(migrationFiles, "migrations")
if err != nil {
return err
}
driver, err := pgx.WithInstance(db.conn.DB, &pgx.Config{MigrationsTable: "api_schema_migrations"})
if err != nil {
return err
}
m, err := migrate.NewWithInstance("iofs", files, "pgx", driver)
if err != nil {
return err
}
if err := m.Up(); err != nil && err != migrate.ErrNoChange {
db.log.Error("migration failed", "error", err)
return err
}
db.log.Debug("migration finished")
return nil
}
func (db *DB) CreateUser(ctx context.Context, username, password string) error {
hash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
if err != nil {
return fmt.Errorf("hash password: %w", err)
}
_, err = db.conn.ExecContext(ctx,
`INSERT INTO users (username, password) VALUES ($1, $2)`,
username, string(hash),
)
if err != nil {
return fmt.Errorf("create user: %w", err)
}
return nil
}
func (db *DB) CheckPassword(ctx context.Context, username, password string) error {
var hash string
err := db.conn.QueryRowContext(ctx,
`SELECT password FROM users WHERE username = $1`,
username,
).Scan(&hash)
if err != nil {
return fmt.Errorf("user not found: %w", err)
}
if err := bcrypt.CompareHashAndPassword([]byte(hash), []byte(password)); err != nil {
return fmt.Errorf("invalid password")
}
return nil
}
package rest
import (
"encoding/json"
"errors"
"log/slog"
"net/http"
"strconv"
"context"
"github.com/VictoriaMetrics/metrics"
"yadro.com/course/api/core"
)
func NewMetricsHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
metrics.WritePrometheus(w, true)
}
}
func NewPingHandler(log *slog.Logger, pingers map[string]core.Pinger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
replies := make(map[string]string)
for name, pinger := range pingers {
if err := pinger.Ping(r.Context()); err != nil {
replies[name] = "error: " + err.Error()
} else {
replies[name] = "ok"
}
}
writeJSON(w, map[string]any{"replies": replies})
}
}
type Authenticator interface {
Login(user, password string) (string, error)
LoginFromDB(name, password string, checkPassword func(name, password string) error) (string, error)
}
type UserChecker interface {
CheckPassword(ctx context.Context, username, password string) error
}
type UserRegistrar interface {
CreateUser(ctx context.Context, username, password string) error
}
func NewRegisterHandler(log *slog.Logger, storage UserRegistrar) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req struct {
Name string `json:"name"`
Password string `json:"password"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
if req.Name == "" || req.Password == "" {
http.Error(w, "name and password are required", http.StatusBadRequest)
return
}
if err := storage.CreateUser(r.Context(), req.Name, req.Password); err != nil {
log.Error("register failed", "error", err)
http.Error(w, "user already exists or internal error", http.StatusConflict)
return
}
w.WriteHeader(http.StatusCreated)
}
}
func NewUserLoginHandler(log *slog.Logger, auth Authenticator, checker UserChecker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req struct {
Name string `json:"name"`
Password string `json:"password"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
token, err := auth.LoginFromDB(req.Name, req.Password, func(name, password string) error {
return checker.CheckPassword(r.Context(), name, password)
})
if err != nil {
log.Error("user login failed", "error", err)
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
w.Header().Set("Content-Type", "text/plain")
if _, err := w.Write([]byte(token)); err != nil {
log.Error("failed to write token", "error", err)
}
}
}
func NewLoginHandler(log *slog.Logger, auth Authenticator) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req struct {
Name string `json:"name"`
Password string `json:"password"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
token, err := auth.Login(req.Name, req.Password)
if err != nil {
log.Error("login failed", "error", err)
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
w.Header().Set("Content-Type", "text/plain")
if _, err := w.Write([]byte(token)); err != nil {
log.Error("failed to write token", "error", err)
}
}
}
func NewUpdateHandler(log *slog.Logger, updater core.Updater) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
err := updater.Update(r.Context())
if errors.Is(err, core.ErrAlreadyExists) {
w.WriteHeader(http.StatusAccepted)
return
}
if err != nil {
log.Error("update failed", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
}
func NewUpdateStatsHandler(log *slog.Logger, updater core.Updater) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
stats, err := updater.Stats(r.Context())
if err != nil {
log.Error("stats failed", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, map[string]int{
"words_total": stats.WordsTotal,
"words_unique": stats.WordsUnique,
"comics_fetched": stats.ComicsFetched,
"comics_total": stats.ComicsTotal,
})
}
}
func NewUpdateStatusHandler(log *slog.Logger, updater core.Updater) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
status, err := updater.Status(r.Context())
if err != nil {
log.Error("status failed", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, map[string]string{"status": string(status)})
}
}
func NewDropHandler(log *slog.Logger, updater core.Updater) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if err := updater.Drop(r.Context()); err != nil {
log.Error("drop failed", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
}
func parsePageParams(r *http.Request) (limit, page int, err error) {
limit = 10
if ls := r.URL.Query().Get("limit"); ls != "" {
limit, err = strconv.Atoi(ls)
if err != nil || limit < 1 {
return 0, 0, errors.New("invalid limit")
}
}
page = 1
if ps := r.URL.Query().Get("page"); ps != "" {
page, err = strconv.Atoi(ps)
if err != nil || page < 1 {
return 0, 0, errors.New("invalid page")
}
}
return limit, page, nil
}
func NewSearchHandler(log *slog.Logger, searcher core.Searcher) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
phrase := r.URL.Query().Get("phrase")
if phrase == "" {
http.Error(w, "phrase is required", http.StatusBadRequest)
return
}
limit, page, err := parsePageParams(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
all, err := searcher.Search(r.Context(), phrase, 0)
if err != nil {
log.Error("search failed", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
total := len(all)
pages := (total + limit - 1) / limit
if pages == 0 {
pages = 1
}
offset := (page - 1) * limit
if offset > total {
offset = total
}
end := offset + limit
if end > total {
end = total
}
comics := all[offset:end]
type comicsItem struct {
ID int `json:"id"`
URL string `json:"url"`
}
items := make([]comicsItem, 0, len(comics))
for _, c := range comics {
items = append(items, comicsItem{ID: c.ID, URL: c.URL})
}
writeJSON(w, map[string]any{
"comics": items,
"total": total,
"page": page,
"pages": pages,
})
}
}
func NewSearchIndexHandler(log *slog.Logger, searcher core.Searcher) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
phrase := r.URL.Query().Get("phrase")
if phrase == "" {
http.Error(w, "phrase is required", http.StatusBadRequest)
return
}
limitStr := r.URL.Query().Get("limit")
limit := 10
if limitStr != "" {
var err error
limit, err = strconv.Atoi(limitStr)
if err != nil || limit < 0 {
http.Error(w, "invalid limit", http.StatusBadRequest)
return
}
}
comics, err := searcher.SearchIndex(r.Context(), phrase, limit)
if err != nil {
log.Error("isearch failed", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
type comicsItem struct {
ID int `json:"id"`
URL string `json:"url"`
}
items := make([]comicsItem, 0, len(comics))
for _, c := range comics {
items = append(items, comicsItem{ID: c.ID, URL: c.URL})
}
writeJSON(w, map[string]any{
"comics": items,
"total": len(items),
})
}
}
func writeJSON(w http.ResponseWriter, v any) {
w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
if err := enc.Encode(v); err != nil {
http.Error(w, "internal server error", http.StatusInternalServerError)
}
}
package middleware
import (
"net/http"
"strings"
)
type TokenVerifier interface {
Verify(token string) error
}
func Auth(next http.HandlerFunc, verifier TokenVerifier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
auth := r.Header.Get("Authorization")
token := strings.TrimPrefix(auth, "Token ")
if token == "" || token == auth {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if err := verifier.Verify(token); err != nil {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
next(w, r)
}
}
package middleware
import (
"net/http"
)
func Concurrency(next http.HandlerFunc, limit int) http.HandlerFunc {
sem := make(chan struct{}, limit)
return func(w http.ResponseWriter, r *http.Request) {
select {
case sem <- struct{}{}:
defer func() { <-sem }()
next(w, r)
default:
http.Error(w, "service unavailable", http.StatusServiceUnavailable)
}
}
}
package middleware
import (
"fmt"
"net/http"
"time"
"github.com/VictoriaMetrics/metrics"
)
type responseWriter struct {
http.ResponseWriter
code int
}
func (rw *responseWriter) WriteHeader(statusCode int) {
rw.code = statusCode
rw.ResponseWriter.WriteHeader(statusCode)
}
func WithMetrics(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rw := &responseWriter{ResponseWriter: w, code: http.StatusOK}
start := time.Now()
next.ServeHTTP(rw, r)
duration := time.Since(start).Seconds()
pattern := r.Pattern
if pattern == "" {
pattern = r.URL.Path
}
label := fmt.Sprintf(`http_request_duration_seconds{status=%q, url=%q}`, fmt.Sprint(rw.code), pattern)
metrics.GetOrCreateHistogram(label).Update(duration)
})
}
package middleware
import (
"net/http"
"golang.org/x/time/rate"
)
func Rate(next http.HandlerFunc, rps int) http.HandlerFunc {
limiter := rate.NewLimiter(rate.Limit(rps), 1)
return func(w http.ResponseWriter, r *http.Request) {
if err := limiter.Wait(r.Context()); err != nil {
http.Error(w, "service unavailable", http.StatusServiceUnavailable)
return
}
next(w, r)
}
}
package rest
// LoginRequest represents login credentials
// @Description Login credentials for admin or user
type LoginRequest struct {
Name string `json:"name" example:"admin" minLength:"1"`
Password string `json:"password" example:"password" minLength:"1"`
}
// SearchResponse represents search results
type SearchResponse struct {
Comics []ComicsItem `json:"comics"`
Total int `json:"total" example:"5"`
}
// ComicsItem represents a single comic
type ComicsItem struct {
ID int `json:"id" example:"353"`
URL string `json:"url" example:"https://imgs.xkcd.com/comics/python.png"`
}
// StatsResponse represents database statistics
type StatsResponse struct {
WordsTotal int `json:"words_total" example:"148234"`
WordsUnique int `json:"words_unique" example:"9412"`
ComicsFetched int `json:"comics_fetched" example:"3241"`
ComicsTotal int `json:"comics_total" example:"3241"`
}
// StatusResponse represents current update status
type StatusResponse struct {
Status string `json:"status" example:"idle" enums:"idle,running"`
}
// AdminLogin godoc
// @Summary Admin login
// @Description Login with admin credentials (from env ADMIN_USER/ADMIN_PASSWORD), returns JWT token
// @Tags auth
// @Accept json
// @Produce plain
// @Param body body LoginRequest true "Admin credentials"
// @Success 200 {string} string "eyJhbGciOiJIUzI1NiJ9..."
// @Failure 400 {string} string "invalid request body"
// @Failure 401 {string} string "unauthorized"
// @Router /api/login [post]
func AdminLogin() {}
// UserLogin godoc
// @Summary User login
// @Description Login with registered user credentials, returns JWT token valid for searching
// @Tags auth
// @Accept json
// @Produce plain
// @Param body body LoginRequest true "User credentials"
// @Success 200 {string} string "eyJhbGciOiJIUzI1NiJ9..."
// @Failure 400 {string} string "invalid request body"
// @Failure 401 {string} string "unauthorized"
// @Router /api/user/login [post]
func UserLogin() {}
// Register godoc
// @Summary Register user
// @Description Create a new user account with bcrypt-hashed password
// @Tags auth
// @Accept json
// @Produce plain
// @Param body body LoginRequest true "New user credentials"
// @Success 201 {string} string "created"
// @Failure 400 {string} string "name and password are required"
// @Failure 409 {string} string "user already exists or internal error"
// @Router /api/register [post]
func Register() {}
// Search godoc
// @Summary Search comics
// @Description Search XKCD comics by phrase using full-text search against the database
// @Tags search
// @Produce json
// @Param phrase query string true "Search phrase" minlength(1)
// @Param limit query int false "Max number of results" default(10) minimum(1) maximum(100)
// @Success 200 {object} SearchResponse
// @Failure 400 {string} string "phrase is required"
// @Failure 500 {string} string "internal server error"
// @Router /api/search [get]
func Search() {}
// ISearch godoc
// @Summary Index search
// @Description Search XKCD comics using in-memory index, faster but requires index to be built
// @Tags search
// @Produce json
// @Param phrase query string true "Search phrase" minlength(1)
// @Param limit query int false "Max number of results" default(10) minimum(1) maximum(100)
// @Success 200 {object} SearchResponse
// @Failure 400 {string} string "phrase is required"
// @Failure 500 {string} string "internal server error"
// @Router /api/isearch [get]
func ISearch() {}
// DBUpdate godoc
// @Summary Update database
// @Description Fetch new comics from xkcd.com and store them. Runs asynchronously — returns 202 if already running
// @Tags admin
// @Security BearerAuth
// @Success 200 {string} string "ok"
// @Success 202 {string} string "already running"
// @Failure 401 {string} string "unauthorized"
// @Failure 500 {string} string "internal server error"
// @Router /api/db/update [post]
func DBUpdate() {}
// DBStats godoc
// @Summary Database stats
// @Description Returns total and fetched comics count, total and unique words count
// @Tags admin
// @Produce json
// @Success 200 {object} StatsResponse
// @Failure 500 {string} string "internal server error"
// @Router /api/db/stats [get]
func DBStats() {}
// DBStatus godoc
// @Summary Update status
// @Description Returns current update status: idle or running
// @Tags admin
// @Produce json
// @Success 200 {object} StatusResponse
// @Failure 500 {string} string "internal server error"
// @Router /api/db/status [get]
func DBStatus() {}
// DBDrop godoc
// @Summary Drop database
// @Description Delete all comics and words from the database, resets the search index and cache
// @Tags admin
// @Security BearerAuth
// @Success 200 {string} string "ok"
// @Failure 401 {string} string "unauthorized"
// @Failure 500 {string} string "internal server error"
// @Router /api/db [delete]
func DBDrop() {}
package search
import (
"context"
"log/slog"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
"yadro.com/course/api/core"
searchpb "yadro.com/course/proto/search"
)
type Client struct {
log *slog.Logger
client searchpb.SearchClient
}
func NewClient(address string, log *slog.Logger) (*Client, error) {
conn, err := grpc.NewClient(address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
if err != nil {
return nil, err
}
return &Client{
client: searchpb.NewSearchClient(conn),
log: log,
}, nil
}
func NewClientWithGRPC(grpcClient searchpb.SearchClient, log *slog.Logger) (*Client, error) {
return &Client{client: grpcClient, log: log}, nil
}
func (c *Client) Ping(ctx context.Context) error {
_, err := c.client.Ping(ctx, &emptypb.Empty{})
return err
}
func (c *Client) Search(ctx context.Context, phrase string, limit int) ([]core.Comics, error) {
resp, err := c.client.Search(ctx, &searchpb.SearchRequest{
Phrase: phrase,
Limit: int64(limit),
})
if err != nil {
return nil, err
}
comics := make([]core.Comics, 0, len(resp.Comics))
for _, c := range resp.Comics {
comics = append(comics, core.Comics{ID: int(c.Id), URL: c.Url})
}
return comics, nil
}
func (c *Client) SearchIndex(ctx context.Context, phrase string, limit int) ([]core.Comics, error) {
resp, err := c.client.ISearch(ctx, &searchpb.SearchRequest{
Phrase: phrase,
Limit: int64(limit),
})
if err != nil {
return nil, err
}
comics := make([]core.Comics, 0, len(resp.Comics))
for _, c := range resp.Comics {
comics = append(comics, core.Comics{ID: int(c.Id), URL: c.Url})
}
return comics, nil
}
package update
import (
"context"
"errors"
"log/slog"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"yadro.com/course/api/core"
updatepb "yadro.com/course/proto/update"
)
type Client struct {
log *slog.Logger
client updatepb.UpdateClient
}
func NewClient(address string, log *slog.Logger) (*Client, error) {
conn, err := grpc.NewClient(address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
if err != nil {
return nil, err
}
return &Client{
client: updatepb.NewUpdateClient(conn),
log: log,
}, nil
}
func NewClientWithGRPC(grpcClient updatepb.UpdateClient, log *slog.Logger) *Client {
return &Client{client: grpcClient, log: log}
}
func (c Client) Ping(ctx context.Context) error {
_, err := c.client.Ping(ctx, &emptypb.Empty{})
return err
}
func (c Client) Status(ctx context.Context) (core.UpdateStatus, error) {
res, err := c.client.Status(ctx, &emptypb.Empty{})
if err != nil {
return core.StatusUpdateUnknown, err
}
switch res.Status {
case updatepb.Status_STATUS_IDLE:
return core.StatusUpdateIdle, nil
case updatepb.Status_STATUS_RUNNING:
return core.StatusUpdateRunning, nil
default:
return core.StatusUpdateUnknown, errors.New("unknown status")
}
}
func (c Client) Stats(ctx context.Context) (core.UpdateStats, error) {
res, err := c.client.Stats(ctx, &emptypb.Empty{})
if err != nil {
return core.UpdateStats{}, err
}
return core.UpdateStats{
WordsTotal: int(res.WordsTotal),
WordsUnique: int(res.WordsUnique),
ComicsFetched: int(res.ComicsFetched),
ComicsTotal: int(res.ComicsTotal),
}, nil
}
func (c Client) Update(ctx context.Context) error {
_, err := c.client.Update(ctx, &emptypb.Empty{})
if err != nil {
if status.Code(err) == codes.Aborted {
return core.ErrAlreadyExists
}
return err
}
return nil
}
func (c Client) Drop(ctx context.Context) error {
_, err := c.client.Drop(ctx, &emptypb.Empty{})
return err
}
package words
import (
"context"
"log/slog"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
wordspb "yadro.com/course/proto/words"
)
type Client struct {
log *slog.Logger
client wordspb.WordsClient
}
func NewClient(address string, log *slog.Logger) (*Client, error) {
conn, err := grpc.NewClient(address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
if err != nil {
return nil, err
}
return &Client{
client: wordspb.NewWordsClient(conn),
log: log,
}, nil
}
func NewClientWithGRPC(grpcClient wordspb.WordsClient, log *slog.Logger) *Client {
return &Client{client: grpcClient, log: log}
}
func (c Client) Norm(ctx context.Context, phrase string) ([]string, error) {
resp, err := c.client.Norm(ctx, &wordspb.WordsRequest{Phrase: phrase})
if err != nil {
return nil, err
}
return resp.Words, nil
}
func (c Client) Ping(ctx context.Context) error {
_, err := c.client.Ping(ctx, &emptypb.Empty{})
return err
}
package config
import (
"log"
"time"
"github.com/ilyakaznacheev/cleanenv"
)
type HTTPConfig struct {
Address string `yaml:"address" env:"API_ADDRESS" env-default:"localhost:80"`
Timeout time.Duration `yaml:"timeout" env:"API_TIMEOUT" env-default:"5s"`
}
type Config struct {
LogLevel string `yaml:"log_level" env:"LOG_LEVEL" env-default:"DEBUG"`
SearchConcurrency int `yaml:"search_concurrency" env:"SEARCH_CONCURRENCY" env-default:"1"`
SearchRate int `yaml:"search_rate" env:"SEARCH_RATE" env-default:"1"`
HTTPConfig HTTPConfig `yaml:"api_server"`
WordsAddress string `yaml:"words_address" env:"WORDS_ADDRESS" env-default:"words:81"`
UpdateAddress string `yaml:"update_address" env:"UPDATE_ADDRESS" env-default:"update:82"`
SearchAddress string `yaml:"search_address" env:"SEARCH_ADDRESS" env-default:"search:83"`
TokenTTL time.Duration `yaml:"token_ttl" env:"TOKEN_TTL" env-default:"24h"`
DBAddress string `yaml:"db_address" env:"DB_ADDRESS" env-default:""`
OTLPEndpoint string `yaml:"otlp_endpoint" env:"OTLP_ENDPOINT" env-default:""`
}
func MustLoad(configPath string) Config {
var cfg Config
if err := cleanenv.ReadConfig(configPath, &cfg); err != nil {
log.Fatalf("cannot read config %q: %s", configPath, err)
}
return cfg
}
package config
import (
"log"
"time"
"github.com/ilyakaznacheev/cleanenv"
)
type Config struct {
LogLevel string `yaml:"log_level" env:"LOG_LEVEL" env-default:"DEBUG"`
Address string `yaml:"address" env:"FRONTEND_ADDRESS" env-default:":8080"`
Timeout time.Duration `yaml:"timeout" env:"FRONTEND_TIMEOUT" env-default:"5s"`
APIAddress string `yaml:"api_address" env:"API_ADDRESS" env-default:"http://api:8080"`
}
func MustLoad(configPath string) Config {
var cfg Config
if err := cleanenv.ReadConfig(configPath, &cfg); err != nil {
log.Fatalf("cannot read config %q: %s", configPath, err)
}
return cfg
}
package handlers
import (
"bytes"
"encoding/json"
"fmt"
"html/template"
"io"
"log/slog"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
type Handler struct {
log *slog.Logger
apiAddress string
templates *template.Template
httpClient *http.Client
}
func New(log *slog.Logger, apiAddress string, templates *template.Template) *Handler {
return &Handler{
log: log,
apiAddress: apiAddress,
templates: templates,
httpClient: &http.Client{Timeout: 10 * time.Second},
}
}
func (h *Handler) SearchPage(w http.ResponseWriter, r *http.Request) {
phrase := r.URL.Query().Get("phrase")
limit := 10
if v, err := strconv.Atoi(r.URL.Query().Get("limit")); err == nil && v > 0 {
limit = v
}
page := 1
if v, err := strconv.Atoi(r.URL.Query().Get("page")); err == nil && v > 0 {
page = v
}
token := h.getUserToken(r)
data := map[string]any{
"Phrase": phrase,
"Limit": limit,
"Page": page,
"LoggedIn": token != "",
}
if phrase != "" {
if token == "" {
data["Error"] = "Please log in to search"
} else {
comics, total, pages, err := h.search(r, phrase, limit, page, token)
if err != nil {
data["Error"] = err.Error()
} else {
data["Comics"] = comics
data["Total"] = total
data["Pages"] = pages
data["PrevPage"] = page - 1
data["NextPage"] = page + 1
data["HasPrev"] = page > 1
data["HasNext"] = page < pages
}
}
}
h.render(w, "search.html", data)
}
func (h *Handler) AdminPage(w http.ResponseWriter, r *http.Request) {
token := h.getToken(r)
data := map[string]any{
"LoggedIn": token != "",
}
if token != "" {
stats, err := h.getStats(r, token)
if err != nil {
data["StatsError"] = err.Error()
} else {
data["Stats"] = stats
}
status, err := h.getStatus(r, token)
if err != nil {
data["StatusError"] = err.Error()
} else {
data["Status"] = status
}
}
if msg := r.URL.Query().Get("msg"); msg != "" {
data["Message"] = msg
}
if errMsg := r.URL.Query().Get("err"); errMsg != "" {
data["Error"] = errMsg
}
h.render(w, "admin.html", data)
}
func (h *Handler) Login(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Redirect(w, r, "/admin", http.StatusSeeOther)
return
}
name := r.FormValue("name")
password := r.FormValue("password")
token, err := h.login(name, password)
if err != nil {
http.Redirect(w, r, "/admin?err="+url.QueryEscape("Login failed: "+err.Error()), http.StatusSeeOther)
return
}
http.SetCookie(w, &http.Cookie{
Name: "token",
Value: token,
Path: "/",
HttpOnly: true,
})
http.Redirect(w, r, "/admin?msg="+url.QueryEscape("Logged in successfully"), http.StatusSeeOther)
}
func (h *Handler) Logout(w http.ResponseWriter, r *http.Request) {
http.SetCookie(w, &http.Cookie{
Name: "token",
Value: "",
Path: "/",
MaxAge: -1,
Expires: time.Unix(0, 0),
})
http.Redirect(w, r, "/admin", http.StatusSeeOther)
}
func (h *Handler) Update(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Redirect(w, r, "/admin", http.StatusSeeOther)
return
}
token := h.getToken(r)
if token == "" {
http.Redirect(w, r, "/admin?err="+url.QueryEscape("Not authenticated"), http.StatusSeeOther)
return
}
req, _ := http.NewRequestWithContext(r.Context(), http.MethodPost, h.apiAddress+"/api/db/update", nil)
req.Header.Set("Authorization", "Token "+token)
resp, err := h.httpClient.Do(req)
if err != nil {
http.Redirect(w, r, "/admin?err="+url.QueryEscape("Update request failed: "+err.Error()), http.StatusSeeOther)
return
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode == http.StatusAccepted {
http.Redirect(w, r, "/admin?msg="+url.QueryEscape("Update already running"), http.StatusSeeOther)
return
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
http.Redirect(w, r, "/admin?err="+url.QueryEscape("Update failed: "+strings.TrimSpace(string(body))), http.StatusSeeOther)
return
}
http.Redirect(w, r, "/admin?msg="+url.QueryEscape("Update started"), http.StatusSeeOther)
}
func (h *Handler) Drop(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Redirect(w, r, "/admin", http.StatusSeeOther)
return
}
token := h.getToken(r)
if token == "" {
http.Redirect(w, r, "/admin?err="+url.QueryEscape("Not authenticated"), http.StatusSeeOther)
return
}
req, _ := http.NewRequestWithContext(r.Context(), http.MethodDelete, h.apiAddress+"/api/db", nil)
req.Header.Set("Authorization", "Token "+token)
resp, err := h.httpClient.Do(req)
if err != nil {
http.Redirect(w, r, "/admin?err="+url.QueryEscape("Drop request failed: "+err.Error()), http.StatusSeeOther)
return
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
http.Redirect(w, r, "/admin?err="+url.QueryEscape("Drop failed: "+strings.TrimSpace(string(body))), http.StatusSeeOther)
return
}
http.Redirect(w, r, "/admin?msg="+url.QueryEscape("Database dropped"), http.StatusSeeOther)
}
type comicsItem struct {
ID int `json:"id"`
URL string `json:"url"`
}
func (h *Handler) search(r *http.Request, phrase string, limit, page int, token string) ([]comicsItem, int, int, error) {
apiURL := fmt.Sprintf("%s/api/search?phrase=%s&limit=%d&page=%d", h.apiAddress, url.QueryEscape(phrase), limit, page)
req, _ := http.NewRequestWithContext(r.Context(), http.MethodGet, apiURL, nil)
req.Header.Set("Authorization", "Token "+token)
resp, err := h.httpClient.Do(req)
if err != nil {
return nil, 0, 0, fmt.Errorf("search request failed: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, 0, 0, fmt.Errorf("search error: %s", strings.TrimSpace(string(body)))
}
var result struct {
Comics []comicsItem `json:"comics"`
Total int `json:"total"`
Pages int `json:"pages"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, 0, 0, fmt.Errorf("failed to parse response: %w", err)
}
return result.Comics, result.Total, result.Pages, nil
}
func (h *Handler) login(name, password string) (string, error) {
body, _ := json.Marshal(map[string]string{"name": name, "password": password})
resp, err := h.httpClient.Post(h.apiAddress+"/api/login", "application/json", bytes.NewReader(body))
if err != nil {
return "", fmt.Errorf("login request failed: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("invalid credentials")
}
token, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return strings.TrimSpace(string(token)), nil
}
func (h *Handler) getStats(r *http.Request, token string) (map[string]int, error) {
req, _ := http.NewRequestWithContext(r.Context(), http.MethodGet, h.apiAddress+"/api/db/stats", nil)
req.Header.Set("Authorization", "Token "+token)
resp, err := h.httpClient.Do(req)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
var stats map[string]int
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
return nil, err
}
return stats, nil
}
func (h *Handler) getStatus(r *http.Request, token string) (string, error) {
req, _ := http.NewRequestWithContext(r.Context(), http.MethodGet, h.apiAddress+"/api/db/status", nil)
req.Header.Set("Authorization", "Token "+token)
resp, err := h.httpClient.Do(req)
if err != nil {
return "", err
}
defer func() { _ = resp.Body.Close() }()
var result struct {
Status string `json:"status"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
return result.Status, nil
}
func (h *Handler) RegisterPage(w http.ResponseWriter, r *http.Request) {
h.render(w, "register.html", map[string]any{})
}
func (h *Handler) Register(w http.ResponseWriter, r *http.Request) {
name := r.FormValue("name")
password := r.FormValue("password")
body, _ := json.Marshal(map[string]string{"name": name, "password": password})
resp, err := h.httpClient.Post(h.apiAddress+"/api/register", "application/json", bytes.NewReader(body))
if err != nil {
h.render(w, "register.html", map[string]any{"Error": "Registration failed: " + err.Error()})
return
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusCreated {
h.render(w, "register.html", map[string]any{"Error": "User already exists"})
return
}
http.Redirect(w, r, "/login?msg="+url.QueryEscape("Registered successfully, please log in"), http.StatusSeeOther)
}
func (h *Handler) UserLoginPage(w http.ResponseWriter, r *http.Request) {
data := map[string]any{}
if msg := r.URL.Query().Get("msg"); msg != "" {
data["Message"] = msg
}
h.render(w, "user_login.html", data)
}
func (h *Handler) UserLogin(w http.ResponseWriter, r *http.Request) {
name := r.FormValue("name")
password := r.FormValue("password")
body, _ := json.Marshal(map[string]string{"name": name, "password": password})
resp, err := h.httpClient.Post(h.apiAddress+"/api/user/login", "application/json", bytes.NewReader(body))
if err != nil {
h.render(w, "user_login.html", map[string]any{"Error": "Login failed: " + err.Error()})
return
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
h.render(w, "user_login.html", map[string]any{"Error": "Invalid credentials"})
return
}
token, _ := io.ReadAll(resp.Body)
http.SetCookie(w, &http.Cookie{
Name: "user_token",
Value: strings.TrimSpace(string(token)),
Path: "/",
HttpOnly: true,
})
http.Redirect(w, r, "/", http.StatusSeeOther)
}
func (h *Handler) UserLogout(w http.ResponseWriter, r *http.Request) {
http.SetCookie(w, &http.Cookie{
Name: "user_token",
Value: "",
Path: "/",
MaxAge: -1,
Expires: time.Unix(0, 0),
})
http.Redirect(w, r, "/login", http.StatusSeeOther)
}
func (h *Handler) getUserToken(r *http.Request) string {
c, err := r.Cookie("user_token")
if err != nil {
return ""
}
return c.Value
}
func (h *Handler) getToken(r *http.Request) string {
c, err := r.Cookie("token")
if err != nil {
return ""
}
return c.Value
}
func (h *Handler) render(w http.ResponseWriter, name string, data any) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := h.templates.ExecuteTemplate(w, name, data); err != nil {
h.log.Error("template render error", "template", name, "error", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
}
}
package main
import (
"context"
"errors"
"flag"
"html/template"
"log/slog"
"net/http"
"os"
"os/signal"
"yadro.com/course/frontend/config"
"yadro.com/course/frontend/handlers"
)
func main() {
var configPath string
flag.StringVar(&configPath, "config", "config.yaml", "frontend configuration file")
flag.Parse()
cfg := config.MustLoad(configPath)
log := mustMakeLogger(cfg.LogLevel)
log.Info("starting frontend server")
tmpl := template.Must(template.ParseGlob("/templates/*.html"))
h := handlers.New(log, cfg.APIAddress, tmpl)
mux := http.NewServeMux()
mux.Handle("GET /static/", http.StripPrefix("/static/", http.FileServer(http.Dir("/static"))))
mux.HandleFunc("GET /", h.SearchPage)
mux.HandleFunc("GET /admin", h.AdminPage)
mux.HandleFunc("POST /admin/login", h.Login)
mux.HandleFunc("POST /admin/logout", h.Logout)
mux.HandleFunc("POST /admin/update", h.Update)
mux.HandleFunc("POST /admin/drop", h.Drop)
mux.HandleFunc("GET /register", h.RegisterPage)
mux.HandleFunc("POST /register", h.Register)
mux.HandleFunc("GET /login", h.UserLoginPage)
mux.HandleFunc("POST /login", h.UserLogin)
mux.HandleFunc("POST /logout", h.UserLogout)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
server := http.Server{
Addr: cfg.Address,
ReadHeaderTimeout: cfg.Timeout,
Handler: mux,
}
go func() {
<-ctx.Done()
log.Debug("shutting down frontend server")
if err := server.Shutdown(context.Background()); err != nil {
log.Error("erroneous shutdown", "error", err)
}
}()
log.Info("Running HTTP server", "address", cfg.Address)
if err := server.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
log.Error("server closed unexpectedly", "error", err)
}
}
}
func mustMakeLogger(logLevel string) *slog.Logger {
var level slog.Level
switch logLevel {
case "DEBUG":
level = slog.LevelDebug
case "INFO":
level = slog.LevelInfo
case "ERROR":
level = slog.LevelError
default:
panic("unknown log level: " + logLevel)
}
handler := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level, AddSource: true})
return slog.New(handler)
}
package tracing
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)
func Init(ctx context.Context, serviceName, endpoint string) (func(context.Context) error, error) {
exp, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint(endpoint),
otlptracegrpc.WithInsecure(),
)
if err != nil {
return nil, fmt.Errorf("create otlp exporter: %w", err)
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(serviceName),
)),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.TraceContext{})
return tp.Shutdown, nil
}
package broker
import (
"context"
"log/slog"
"github.com/nats-io/nats.go"
"yadro.com/course/search/core"
)
const (
topicDBUpdated = "xkcd.db.updated"
topicDBDropped = "xkcd.db.dropped"
)
type Subscriber struct {
log *slog.Logger
nc *nats.Conn
builder core.IndexBuilder
resetter core.IndexResetter
cache core.CacheFlusher
}
func New(address string, log *slog.Logger, builder core.IndexBuilder, resetter core.IndexResetter, cache core.CacheFlusher) (*Subscriber, error) {
nc, err := nats.Connect(address)
if err != nil {
return nil, err
}
log.Info("connected to broker", "address", address)
return &Subscriber{log: log, nc: nc, builder: builder, resetter: resetter, cache: cache}, nil
}
func (s *Subscriber) Run(ctx context.Context) {
updatedCh := make(chan *nats.Msg, 10)
droppedCh := make(chan *nats.Msg, 10)
subUpdated, err := s.nc.ChanSubscribe(topicDBUpdated, updatedCh)
if err != nil {
s.log.Error("failed to subscribe to updated topic", "error", err)
return
}
subDropped, err := s.nc.ChanSubscribe(topicDBDropped, droppedCh)
if err != nil {
s.log.Error("failed to subscribe to dropped topic", "error", err)
return
}
defer func() {
_ = subUpdated.Unsubscribe()
_ = subDropped.Unsubscribe()
s.nc.Close()
}()
s.log.Info("broker subscriber running")
for {
select {
case <-ctx.Done():
return
case <-updatedCh:
s.log.Info("received db updated event, rebuilding index")
if err := s.cache.Flush(ctx); err != nil {
s.log.Warn("failed to flush cache on update", "error", err)
}
if err := s.builder.BuildIndex(ctx); err != nil {
s.log.Error("failed to rebuild index after update", "error", err)
}
case <-droppedCh:
s.log.Info("received db dropped event, resetting index")
s.resetter.ResetIndex()
}
}
}
package cache
import (
"context"
"yadro.com/course/search/core"
)
type Noop struct{}
func (Noop) Get(_ context.Context, _ string) ([]core.Comics, bool, error) { return nil, false, nil }
func (Noop) Set(_ context.Context, _ string, _ []core.Comics) error { return nil }
func (Noop) Flush(_ context.Context) error { return nil }
package cache
import (
"context"
"encoding/json"
"time"
"github.com/redis/go-redis/v9"
"yadro.com/course/search/core"
)
const ttl = 10 * time.Minute
type Redis struct {
client *redis.Client
}
func New(address string) (*Redis, error) {
client := redis.NewClient(&redis.Options{Addr: address})
if err := client.Ping(context.Background()).Err(); err != nil {
return nil, err
}
return &Redis{client: client}, nil
}
func (r *Redis) Get(ctx context.Context, key string) ([]core.Comics, bool, error) {
data, err := r.client.Get(ctx, key).Bytes()
if err == redis.Nil {
return nil, false, nil
}
if err != nil {
return nil, false, err
}
var comics []core.Comics
if err := json.Unmarshal(data, &comics); err != nil {
return nil, false, err
}
return comics, true, nil
}
func (r *Redis) Set(ctx context.Context, key string, comics []core.Comics) error {
data, err := json.Marshal(comics)
if err != nil {
return err
}
return r.client.Set(ctx, key, data, ttl).Err()
}
func (r *Redis) Flush(ctx context.Context) error {
return r.client.FlushDB(ctx).Err()
}
package db
import (
"context"
"log/slog"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"yadro.com/course/search/core"
)
type DB struct {
log *slog.Logger
conn *sqlx.DB
}
func New(log *slog.Logger, address string) (*DB, error) {
db, err := sqlx.Connect("pgx", address)
if err != nil {
log.Error("connection problem", "address", address, "error", err)
return nil, err
}
return &DB{log: log, conn: db}, nil
}
type dbComic struct {
ID int `db:"id"`
ImgURL string `db:"img_url"`
Matches int `db:"matches"`
}
func (db *DB) Search(ctx context.Context, keywords []string, limit int) ([]core.Comics, error) {
query := `
SELECT id, img_url,
(SELECT count(*) FROM unnest(keywords) k WHERE k = ANY($1)) AS matches
FROM comics
WHERE keywords && $1
ORDER BY matches DESC`
var args []any
args = append(args, keywords)
if limit > 0 {
query += ` LIMIT $2`
args = append(args, limit)
}
rows, err := db.conn.QueryxContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
var result []core.Comics
for rows.Next() {
var row dbComic
if err := rows.StructScan(&row); err != nil {
return nil, err
}
result = append(result, core.Comics{ID: row.ID, URL: row.ImgURL})
}
if result == nil {
result = []core.Comics{}
}
return result, rows.Err()
}
func (db *DB) AllComics(ctx context.Context) ([]core.IndexComic, error) {
rows, err := db.conn.QueryContext(ctx, `SELECT id, img_url, keywords FROM comics`)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
var result []core.IndexComic
for rows.Next() {
var ic core.IndexComic
var keywords pq.StringArray
if err := rows.Scan(&ic.ID, &ic.URL, &keywords); err != nil {
return nil, err
}
ic.Keywords = []string(keywords)
result = append(result, ic)
}
return result, rows.Err()
}
package grpc
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
searchpb "yadro.com/course/proto/search"
"yadro.com/course/search/core"
)
type Server struct {
searchpb.UnimplementedSearchServer
service core.Searcher
}
func NewServer(service core.Searcher) *Server {
return &Server{service: service}
}
func (s *Server) Ping(_ context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
return &emptypb.Empty{}, nil
}
func (s *Server) Search(ctx context.Context, req *searchpb.SearchRequest) (*searchpb.SearchReply, error) {
comics, err := s.service.Search(ctx, req.Phrase, int(req.Limit))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
pbComics := make([]*searchpb.Comics, 0, len(comics))
for _, c := range comics {
pbComics = append(pbComics, &searchpb.Comics{Id: int64(c.ID), Url: c.URL})
}
return &searchpb.SearchReply{
Comics: pbComics,
Total: int64(len(pbComics)),
}, nil
}
func (s *Server) ISearch(ctx context.Context, req *searchpb.SearchRequest) (*searchpb.SearchReply, error) {
comics, err := s.service.ISearch(ctx, req.Phrase, int(req.Limit))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
pbComics := make([]*searchpb.Comics, 0, len(comics))
for _, c := range comics {
pbComics = append(pbComics, &searchpb.Comics{Id: int64(c.ID), Url: c.URL})
}
return &searchpb.SearchReply{
Comics: pbComics,
Total: int64(len(pbComics)),
}, nil
}
package initiator
import (
"context"
"log/slog"
"time"
"yadro.com/course/search/core"
)
type Initiator struct {
log *slog.Logger
builder core.IndexBuilder
ttl time.Duration
}
func New(log *slog.Logger, builder core.IndexBuilder, ttl time.Duration) *Initiator {
return &Initiator{log: log, builder: builder, ttl: ttl}
}
func (i *Initiator) Run(ctx context.Context) {
i.build(ctx)
ticker := time.NewTicker(i.ttl)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
i.build(ctx)
}
}
}
func (i *Initiator) build(ctx context.Context) {
i.log.Info("building search index")
if err := i.builder.BuildIndex(ctx); err != nil {
i.log.Error("failed to build index", "error", err)
}
}
package words
import (
"context"
"log/slog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
wordspb "yadro.com/course/proto/words"
)
type Client struct {
log *slog.Logger
client wordspb.WordsClient
}
func NewClient(address string, log *slog.Logger) (*Client, error) {
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return &Client{
client: wordspb.NewWordsClient(conn),
log: log,
}, nil
}
func NewClientWithGRPC(client wordspb.WordsClient, log *slog.Logger) *Client {
return &Client{client: client, log: log}
}
func (c *Client) Norm(ctx context.Context, phrase string) ([]string, error) {
resp, err := c.client.Norm(ctx, &wordspb.WordsRequest{Phrase: phrase})
if err != nil {
return nil, err
}
return resp.Words, nil
}
package config
import (
"log/slog"
"os"
"time"
"github.com/ilyakaznacheev/cleanenv"
)
type Config struct {
LogLevel string `yaml:"log_level" env:"LOG_LEVEL" env-default:"DEBUG"`
Address string `yaml:"search_address" env:"SEARCH_ADDRESS" env-default:"localhost:83"`
DBAddress string `yaml:"db_address" env:"DB_ADDRESS" env-default:"localhost:5432"`
WordsAddress string `yaml:"words_address" env:"WORDS_ADDRESS" env-default:"localhost:81"`
IndexTTL time.Duration `yaml:"index_ttl" env:"INDEX_TTL" env-default:"5m"`
BrokerAddress string `yaml:"broker_address" env:"BROKER_ADDRESS" env-default:"nats://localhost:4222"`
RedisAddress string `yaml:"redis_address" env:"REDIS_ADDRESS" env-default:"localhost:6379"`
OTLPEndpoint string `yaml:"otlp_endpoint" env:"OTLP_ENDPOINT" env-default:""`
}
func MustLoad(configPath string) Config {
var cfg Config
if err := cleanenv.ReadConfig(configPath, &cfg); err != nil {
if err2 := cleanenv.ReadEnv(&cfg); err2 != nil {
slog.Error("cannot read config", "path", configPath, "error", err)
os.Exit(1)
}
}
return cfg
}
package core
import (
"context"
"fmt"
"log/slog"
"sort"
"sync"
)
type Service struct {
log *slog.Logger
db DB
words Words
cache Cache
mu sync.RWMutex
index map[string][]Comics
}
func NewService(log *slog.Logger, db DB, words Words, cache Cache) *Service {
return &Service{
log: log,
db: db,
words: words,
cache: cache,
index: make(map[string][]Comics),
}
}
func (s *Service) Search(ctx context.Context, phrase string, limit int) ([]Comics, error) {
keywords, err := s.words.Norm(ctx, phrase)
if err != nil {
return nil, fmt.Errorf("failed to normalize phrase: %w", err)
}
if len(keywords) == 0 {
return []Comics{}, nil
}
cacheKey := fmt.Sprintf("search:%s:%d", phrase, limit)
if cached, ok, err := s.cache.Get(ctx, cacheKey); err == nil && ok {
s.log.Debug("cache hit", "key", cacheKey)
return cached, nil
}
comics, err := s.db.Search(ctx, keywords, limit)
if err != nil {
return nil, err
}
if err := s.cache.Set(ctx, cacheKey, comics); err != nil {
s.log.Warn("cache set failed", "error", err)
}
return comics, nil
}
func (s *Service) ResetIndex() {
s.mu.Lock()
s.index = make(map[string][]Comics)
s.mu.Unlock()
if err := s.cache.Flush(context.Background()); err != nil {
s.log.Warn("cache flush failed", "error", err)
}
s.log.Info("index reset")
}
func (s *Service) BuildIndex(ctx context.Context) error {
all, err := s.db.AllComics(ctx)
if err != nil {
return fmt.Errorf("failed to fetch comics for index: %w", err)
}
newIndex := make(map[string][]Comics, len(all))
for _, ic := range all {
c := Comics{ID: ic.ID, URL: ic.URL}
for _, kw := range ic.Keywords {
newIndex[kw] = append(newIndex[kw], c)
}
}
s.mu.Lock()
s.index = newIndex
s.mu.Unlock()
s.log.Info("index built", "keywords", len(newIndex))
return nil
}
func (s *Service) ISearch(ctx context.Context, phrase string, limit int) ([]Comics, error) {
keywords, err := s.words.Norm(ctx, phrase)
if err != nil {
return nil, fmt.Errorf("failed to normalize phrase: %w", err)
}
if len(keywords) == 0 {
return []Comics{}, nil
}
s.mu.RLock()
counts := make(map[int]int)
seen := make(map[int]Comics)
for _, kw := range keywords {
for _, c := range s.index[kw] {
counts[c.ID]++
seen[c.ID] = c
}
}
s.mu.RUnlock()
type scored struct {
c Comics
n int
}
result := make([]scored, 0, len(counts))
for id, n := range counts {
result = append(result, scored{seen[id], n})
}
sort.Slice(result, func(i, j int) bool { return result[i].n > result[j].n })
if limit > 0 && len(result) > limit {
result = result[:limit]
}
comics := make([]Comics, len(result))
for i, r := range result {
comics[i] = r.c
}
return comics, nil
}
package broker
import (
"log/slog"
"github.com/nats-io/nats.go"
)
const (
TopicDBUpdated = "xkcd.db.updated"
TopicDBDropped = "xkcd.db.dropped"
)
type Publisher struct {
log *slog.Logger
nc *nats.Conn
}
func New(address string, log *slog.Logger) (*Publisher, error) {
nc, err := nats.Connect(address)
if err != nil {
return nil, err
}
log.Info("connected to broker", "address", address)
return &Publisher{log: log, nc: nc}, nil
}
func (p *Publisher) PublishUpdated() {
if err := p.nc.Publish(TopicDBUpdated, []byte("updated")); err != nil {
p.log.Error("failed to publish updated event", "error", err)
return
}
if err := p.nc.Flush(); err != nil {
p.log.Error("failed to flush updated event", "error", err)
}
p.log.Info("published db updated event")
}
func (p *Publisher) PublishDropped() {
if err := p.nc.Publish(TopicDBDropped, []byte("dropped")); err != nil {
p.log.Error("failed to publish dropped event", "error", err)
return
}
if err := p.nc.Flush(); err != nil {
p.log.Error("failed to flush dropped event", "error", err)
}
p.log.Info("published db dropped event")
}
func (p *Publisher) Close() {
p.nc.Close()
}
package db
import (
"embed"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/pgx"
"github.com/golang-migrate/migrate/v4/source/iofs"
)
//go:embed migrations/*.sql
var migrationFiles embed.FS
func (db *DB) Migrate() error {
db.log.Debug("running migration")
files, err := iofs.New(migrationFiles, "migrations") // get migrations from
if err != nil {
return err
}
driver, err := pgx.WithInstance(db.conn.DB, &pgx.Config{})
if err != nil {
return err
}
m, err := migrate.NewWithInstance("iofs", files, "pgx", driver)
if err != nil {
return err
}
err = m.Up()
if err != nil {
if err != migrate.ErrNoChange {
db.log.Error("migration failed", "error", err)
return err
}
db.log.Debug("migration did not change anything")
}
db.log.Debug("migration finished")
return nil
}
package db
import (
"context"
"log/slog"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/jmoiron/sqlx"
"yadro.com/course/update/core"
)
type DB struct {
log *slog.Logger
conn *sqlx.DB
}
func New(log *slog.Logger, address string) (*DB, error) {
db, err := sqlx.Connect("pgx", address)
if err != nil {
log.Error("connection problem", "address", address, "error", err)
return nil, err
}
return &DB{
log: log,
conn: db,
}, nil
}
func (db *DB) Add(ctx context.Context, comics core.Comics) error {
_, err := db.conn.ExecContext(ctx, `
INSERT INTO comics (id, img_url, keywords)
VALUES ($1, $2, $3)`,
comics.ID, comics.URL, comics.Words,
)
return err
}
type dbStats struct {
WordsTotal int `db:"words_total"`
WordsUnique int `db:"words_unique"`
ComicsFetched int `db:"comics_fetched"`
}
func (db *DB) Stats(ctx context.Context) (core.DBStats, error) {
var s dbStats
err := db.conn.GetContext(ctx, &s, `
SELECT
(SELECT count(*) FROM comics) as comics_fetched,
(SELECT COALESCE(SUM(array_length(keywords, 1)), 0) FROM comics) as words_total,
(SELECT count(DISTINCT word) FROM comics, unnest(keywords) as word) as words_unique
`)
return core.DBStats{
WordsTotal: s.WordsTotal,
WordsUnique: s.WordsUnique,
ComicsFetched: s.ComicsFetched,
}, err
}
func (db *DB) IDs(ctx context.Context) ([]int, error) {
var ids []int
err := db.conn.SelectContext(ctx, &ids, "SELECT id FROM comics ORDER BY id")
if err != nil {
return nil, err
}
return ids, nil
}
func (db *DB) Drop(ctx context.Context) error {
_, err := db.conn.ExecContext(ctx, "TRUNCATE TABLE comics")
return err
}
package grpc
import (
"context"
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
updatepb "yadro.com/course/proto/update"
"yadro.com/course/update/core"
)
func NewServer(service core.Updater) *Server {
return &Server{service: service}
}
type Server struct {
updatepb.UnimplementedUpdateServer
service core.Updater
}
func (s *Server) Ping(_ context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
return &emptypb.Empty{}, nil
}
func (s *Server) Status(ctx context.Context, _ *emptypb.Empty) (*updatepb.StatusReply, error) {
st := s.service.Status(ctx)
var grpcStatus updatepb.Status
switch st {
case core.StatusRunning:
grpcStatus = updatepb.Status_STATUS_RUNNING
case core.StatusIdle:
grpcStatus = updatepb.Status_STATUS_IDLE
default:
grpcStatus = updatepb.Status_STATUS_UNSPECIFIED
}
return &updatepb.StatusReply{Status: grpcStatus}, nil
}
func (s *Server) Update(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
err := s.service.Update(ctx)
if errors.Is(err, core.ErrUpdateRunning) {
return nil, status.Error(codes.Aborted, "update in progress")
}
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &emptypb.Empty{}, nil
}
func (s *Server) Stats(ctx context.Context, _ *emptypb.Empty) (*updatepb.StatsReply, error) {
stats, err := s.service.Stats(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &updatepb.StatsReply{
WordsTotal: int64(stats.WordsTotal),
WordsUnique: int64(stats.WordsUnique),
ComicsTotal: int64(stats.ComicsTotal),
ComicsFetched: int64(stats.ComicsFetched),
}, nil
}
func (s *Server) Drop(ctx context.Context, _ *emptypb.Empty) (*emptypb.Empty, error) {
if err := s.service.Drop(ctx); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &emptypb.Empty{}, nil
}
package words
import (
"context"
"log/slog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
wordspb "yadro.com/course/proto/words"
)
type Client struct {
log *slog.Logger
client wordspb.WordsClient
}
func NewClient(address string, log *slog.Logger) (*Client, error) {
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return &Client{
client: wordspb.NewWordsClient(conn),
log: log,
}, nil
}
func NewClientWithGRPC(client wordspb.WordsClient, log *slog.Logger) *Client {
return &Client{client: client, log: log}
}
func (c Client) Norm(ctx context.Context, phrase string) ([]string, error) {
resp, err := c.client.Norm(ctx, &wordspb.WordsRequest{Phrase: phrase})
if err != nil {
return nil, err
}
return resp.Words, nil
}
func (c Client) Ping(ctx context.Context) error {
_, err := c.client.Ping(ctx, &emptypb.Empty{})
return err
}
package xkcd
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"time"
"yadro.com/course/update/core"
)
const lastPath = "/info.0.json"
type Client struct {
log *slog.Logger
client http.Client
url string
}
func NewClient(url string, timeout time.Duration, log *slog.Logger) (*Client, error) {
if url == "" {
return nil, fmt.Errorf("empty base url specified")
}
return &Client{
client: http.Client{Timeout: timeout},
log: log,
url: url,
}, nil
}
type xkcdResponse struct {
Num int `json:"num"`
Img string `json:"img"`
Title string `json:"title"`
Alt string `json:"alt"`
Transcript string `json:"transcript"`
}
func (c Client) Get(ctx context.Context, id int) (core.XKCDInfo, error) {
url := fmt.Sprintf("%s/%d/info.0.json", c.url, id)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return core.XKCDInfo{}, err
}
resp, err := c.client.Do(req)
if err != nil {
return core.XKCDInfo{}, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error("failed to close response body", "error", err)
}
}()
if resp.StatusCode == http.StatusNotFound {
return core.XKCDInfo{}, fmt.Errorf("comic not found")
}
if resp.StatusCode != http.StatusOK {
return core.XKCDInfo{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
var data xkcdResponse
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return core.XKCDInfo{}, err
}
return core.XKCDInfo{
ID: data.Num,
URL: data.Img,
Description: data.Title + " " + data.Alt + " " + data.Transcript,
}, nil
}
func (c Client) LastID(ctx context.Context) (int, error) {
url := fmt.Sprintf("%s%s", c.url, lastPath)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return 0, err
}
resp, err := c.client.Do(req)
if err != nil {
return 0, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error("failed to close response body", "error", err)
}
}()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
var data xkcdResponse
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return 0, err
}
return data.Num, nil
}
package config
import (
"log"
"time"
"github.com/ilyakaznacheev/cleanenv"
)
type XKCD struct {
URL string `yaml:"url" env:"XKCD_URL" env-default:"xkcd.com"`
Concurrency int `yaml:"concurrency" env:"XKCD_CONCURRENCY" env-default:"1"`
Timeout time.Duration `yaml:"timeout" env:"XKCD_TIMEOUT" env-default:"10s"`
CheckPeriod time.Duration `yaml:"check_period" env:"XKCD_CHECK_PERIOD" env-default:"1h"`
}
type Config struct {
LogLevel string `yaml:"log_level" env:"LOG_LEVEL" env-default:"DEBUG"`
Address string `yaml:"update_address" env:"UPDATE_ADDRESS" env-default:"localhost:80"`
XKCD XKCD `yaml:"xkcd"`
DBAddress string `yaml:"db_address" env:"DB_ADDRESS" env-default:"localhost:82"`
WordsAddress string `yaml:"words_address" env:"WORDS_ADDRESS" env-default:"localhost:81"`
BrokerAddress string `yaml:"broker_address" env:"BROKER_ADDRESS" env-default:"nats://localhost:4222"`
}
func MustLoad(configPath string) Config {
var cfg Config
if err := cleanenv.ReadConfig(configPath, &cfg); err != nil {
if err2 := cleanenv.ReadEnv(&cfg); err2 != nil {
log.Fatalf("cannot read config %q: %s", configPath, err)
}
}
return cfg
}
package core
import (
"context"
"fmt"
"log/slog"
"sync"
"sync/atomic"
)
type Service struct {
log *slog.Logger
db DB
xkcd XKCD
words Words
publisher Publisher
concurrency int
isRunning atomic.Bool
}
func NewService(
log *slog.Logger, db DB, xkcd XKCD, words Words, publisher Publisher, concurrency int,
) (*Service, error) {
if concurrency < 1 {
return nil, fmt.Errorf("wrong concurrency specified: %d", concurrency)
}
return &Service{
log: log,
db: db,
xkcd: xkcd,
words: words,
publisher: publisher,
concurrency: concurrency,
}, nil
}
func (s *Service) Update(ctx context.Context) (err error) {
if !s.isRunning.CompareAndSwap(false, true) {
return ErrUpdateRunning
}
defer s.isRunning.Store(false)
lastID, err := s.xkcd.LastID(ctx)
if err != nil {
return fmt.Errorf("failed to get last ID: %w", err)
}
existingIDs, err := s.db.IDs(ctx)
if err != nil {
return fmt.Errorf("failed to get existing IDs: %w", err)
}
existingSet := make(map[int]struct{}, len(existingIDs))
for _, id := range existingIDs {
existingSet[id] = struct{}{}
}
missingIDs := make([]int, 0)
for id := 1; id <= lastID; id++ {
if id == 404 {
continue
}
if _, exists := existingSet[id]; !exists {
missingIDs = append(missingIDs, id)
}
}
if len(missingIDs) == 0 {
return nil
}
jobs := make(chan int)
go func() {
for _, id := range missingIDs {
jobs <- id
}
close(jobs)
}()
var wg sync.WaitGroup
for i := 0; i < s.concurrency; i++ {
wg.Go(func() {
for id := range jobs {
if ctx.Err() != nil {
return
}
comic, err := s.xkcd.Get(ctx, id)
if err != nil {
s.log.Warn("Failed to fetch comic", "id", id, "error", err)
continue
}
keywords, err := s.words.Norm(ctx, comic.Description)
if err != nil {
s.log.Warn("Failed to normalize words", "id", id, "error", err)
continue
}
if err := s.db.Add(ctx, Comics{
ID: comic.ID,
URL: comic.URL,
Words: keywords,
}); err != nil {
s.log.Warn("Failed to save comic", "id", id, "error", err)
}
}
})
}
wg.Wait()
s.publisher.PublishUpdated()
return nil
}
func (s *Service) Stats(ctx context.Context) (ServiceStats, error) {
dbStats, err := s.db.Stats(ctx)
if err != nil {
return ServiceStats{}, err
}
lastID, err := s.xkcd.LastID(ctx)
if err != nil {
return ServiceStats{}, err
}
comicsTotal := lastID
if lastID >= 404 {
comicsTotal--
}
return ServiceStats{
DBStats: dbStats,
ComicsTotal: comicsTotal,
}, nil
}
func (s *Service) Status(ctx context.Context) ServiceStatus {
if s.isRunning.Load() {
return StatusRunning
}
return StatusIdle
}
func (s *Service) Drop(ctx context.Context) error {
if err := s.db.Drop(ctx); err != nil {
return err
}
s.publisher.PublishDropped()
return nil
}
package words
import (
"strings"
"unicode"
"github.com/kljensen/snowball/english"
)
func Norm(phrase string) []string {
cleaned := strings.Map(func(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return unicode.ToLower(r)
}
return ' '
}, phrase)
tokens := strings.Fields(cleaned)
seen := make(map[string]bool)
var result []string
for _, w := range tokens {
if len(w) <= 2 {
continue
}
stemmed := english.Stem(w, false)
if !seen[stemmed] {
seen[stemmed] = true
result = append(result, stemmed)
}
}
return result
}