Spracovanie veľkých XML feedov od dodávateľov: Ako nepreťažiť server

Od nolimeo · 13. marca 2026
banner image

Pre stredné a veľké e-shopy je integrácia dodávateľských dát základnou podmienkou fungovania. Keď však obchod začne spolupracovať s desiatkami veľkoobchodných dodávateľov, z ktorých každý poskytuje XML alebo JSON feed s desiatkami tisíc produktov vrátane variantov, cien a skladových zásob, narazíte na tvrdý technologický limit.

Tradičný prístup je jednoduchá cron úloha, ktorá každú noc stiahne XML súbory, načíta ich do pamäte, preiteruje produkty a vykoná tisíce SQL dopytov do databázy. Pri veľkých súboroch sa servery preťažujú, RAM alokácia presahuje povolené limity, CPU špičky rastú a databázový fond spojení (connection pool) sa vyčerpá. Výsledkom môžu byť uzamknuté tabuľky databázy (row-level / table-level locking), pre ktoré reálni zákazníci na webe namiesto rýchleho načítania stránky vidia chybové hlásenie 504 Gateway Timeout.

V technologickom štúdiu nolimeo pomáhame firmám nahrádzať tieto krehké integrácie robustnejšou asynchrónnou architektúrou. V tomto inžinierskom rozbore sa pozrieme na to, prečo klasické synchronizácie zlyhávajú, ako pracovať s Node.js streamami bez zahltenia operačnej pamäte a ako vybudovať priepustnú spracovateľskú pipeline s využitím fronty BullMQ (Redis) a PostgreSQL (Drizzle ORM).


1. Architektúra oddeleného a asynchrónneho spracovania

Základným pravidlom pri spracovaní veľkých objemov dát je oddelenie od hlavného aplikačného vlákna. Hlavný e-shopový server, ktorý obsluhuje požiadavky používateľov na frontende (Next.js, Medusa.js), nemá byť zaťažený výpočtovo náročným parsovaním XML súborov a synchrónnym zápisom do databázy.

Naše riešenie spočíva v prechode na distribuovanú a asynchrónnu pipeline, kde parsovanie súboru a ukladanie do databázy prebieha v oddelených procesoch (workeroch) riadených robustnou správou front.

 [ XML Feed URL (Dodávateľ) ]
             │
             ▼ (HTTP Stream - Fetch API)
  [ Node.js Readable Stream ]
             │
             ▼ (Pipeline / Pipe)
  [ XML Stream Parser (obmedzená RAM) ]  ───► (Backpressure mechanizmus)
             │
             ▼ (Dávkovanie po 100 kusoch)
  [ Redis / BullMQ Fronta ]
             │
    ┌────────┴────────┐
    ▼ (Worker 1)      ▼ (Worker 2)  ───► (Riadená paralelnosť / Concurrency)
  [ PostgreSQL DB (Drizzle UPSERT) ]

Prečo táto architektúra funguje?

  1. Znižuje riziko výpadkov RAM: XML súbor sa nenačítava do pamäte ako celok. Streamuje sa po malých častiach (chunks) priamo z HTTP spojenia do parsera. Memory footprint tak môže zostať stabilný aj pri výrazne väčších feedoch.
  2. Tlmi databázové špičky: Sťahovanie a zápis nie sú synchrónne prepojené. Ak databáza nestíha zapisovať, fronta (BullMQ/Redis) funguje ako tlmič (buffer). Produkty sa ukladajú do fronty rýchlo a workeri ich následne ukladajú do databázy riadeným tempom.
  3. Typová bezpečnosť a validácia: Každý produkt z feedu je pred zápisom prehnaný cez prísnu validáciu (Zod), ktorá odfiltruje chybné ceny, chýbajúce EAN kódy alebo poškodené dáta bez toho, aby musel spadnúť celý import.

2. Prečo fs.readFile() zlyháva: Fyzika pamäte a Backpressure

Ak v Node.js použijete metódu fs.readFile() alebo stiahnete celý feed do pamäte ako jeden obrovský reťazec, engine V8 alokuje veľké množstvo RAM. XML súbor môže po naparsovaní do pamäťového objektu (DOM stromu) spotrebovať násobne viac pamäte než jeho pôvodná veľkosť. Je to spôsobené tým, že každý XML element, atribút a textový uzol je v JavaScripte reprezentovaný ako objekt s vlastnými metódami a metadátami.

Riešením je streamovanie a parsovanie za behu (Event-driven / SAX parsing). V tomto režime čítame súbor po malých častiach a spúšťame udalosti len pri prechode konkrétnymi uzlami XML.

Význam spätného tlaku (Backpressure)

Pri streamovaní dát narážame na problém rozdielnej rýchlosti: čítanie dát zo siete alebo z disku je spravidla omnoho rýchlejšie ako zápis dát do Redis fronty alebo databázy. Ak by sme čítali dáta bez obmedzenia, pamäťový buffer by sa preplnil a aplikácia by spadla na nedostatok pamäte (OOM - Out of Memory).

Node.js prúdové prepojenie (pipeline) implementuje automatické riadenie spätného tlaku. Ak zapisovací stream signalizuje, že jeho vnútorný buffer je plný (vráti false pri zápise), čítací stream sa dočasne pozastaví, kým sa buffer neuvoľní.


3. Technická implementácia v TypeScripte: Stream, Queue a Worker

Nižšie uvádzame ukážku TypeScript kódu pre Node.js. Skript streamuje XML feed priamo z HTTP spojenia, asynchrónne filtruje produkty pomocou stream parsera, overuje ich štruktúru cez knižnicu Zod na ochranu integrity databázy a odosiela ich v dávkach do fronty BullMQ na spracovanie workerom.

3.1. Stream Parser a plnenie BullMQ fronty

Pre efektívne spracovanie streamu XML použijeme balík fast-xml-parser v kombinácii s natívnymi Node.js streamami.

// src/services/feed-processor.service.ts
import { Readable, pipeline } from "stream";
import { promisify } from "util";
import { XMLParser } from "fast-xml-parser";
import { Queue } from "bullmq";
import IORedis from "ioredis";
import { z } from "zod";

const streamPipeline = promisify(pipeline);

// Redis pripojenie pre frontu BullMQ
const redisConnection = new IORedis(process.env.REDIS_URL || "redis://127.0.0.1:6379");
const productQueue = new Queue("product-import-queue", { connection: redisConnection });

// Strict schema pre validáciu produktov z feedu
export const ProductFeedSchema = z.object({
  sku: z.string().min(3),
  name: z.string().min(1),
  price: z.preprocess((val) => Number(val), z.number().positive()),
  stock: z.preprocess((val) => Number(val), z.number().int().nonnegative()),
  ean: z.string().optional(),
});

export type ValidatedProduct = z.infer<typeof ProductFeedSchema>;

/**
 * Hlavná služba na stiahnutie a pamäťovo úsporné parsovanie XML feedu
 */
export async function downloadAndParseFeed(feedUrl: string): Promise<void> {
  const response = await fetch(feedUrl);
  if (!response.ok || !response.body) {
    throw new Error(`Zlyhalo stiahnutie feedu zo servera dodávateľa: ${response.statusText}`);
  }

  // Prevod Fetch ReadableStream na Node.js Readable stream
  const reader = Readable.from(response.body as any);
  
  // Konfigurácia fast-xml-parser pre asynchrónne parsovanie streamu
  const parser = new XMLParser({
    ignoreAttributes: false,
    attributeNamePrefix: "",
  });

  let productBuffer: ValidatedProduct[] = [];
  const BATCH_SIZE = 100;

  // Vytvoríme vlastný transformačný stream pre extrakciu a validáciu produktov
  const filterTransform = new Readable({
    objectMode: true,
    read() {} // Vyžadované rozhraním streamu
  });

  // Udalostné spracovanie dát priamo z toku
  let leftoverData = "";
  
  reader.on("data", async (chunk: Buffer) => {
    const data = leftoverData + chunk.toString("utf-8");
    const matches = data.matchAll(/<product>([\s\S]*?)<\/product>/g);
    
    let lastIndex = 0;
    for (const match of matches) {
      if (match.index !== undefined) {
        try {
          const parsed = parser.parse(`<product>${match[1]}</product>`);
          const rawProduct = parsed.product;
          
          // Validácia typu pomocou Zod na ochranu databázy pred chybami dodávateľa
          const validated = ProductFeedSchema.safeParse({
            sku: rawProduct.code || rawProduct.sku,
            name: rawProduct.name || rawProduct.title,
            price: rawProduct.price,
            stock: rawProduct.stock_quantity || rawProduct.stock,
            ean: rawProduct.ean,
          });

          if (validated.success) {
            productBuffer.push(validated.data);
            
            // Plnenie fronty v optimalizovaných balíkoch (batches)
            if (productBuffer.length >= BATCH_SIZE) {
              const batchToQueue = [...productBuffer];
              productBuffer = [];
              
              await productQueue.add("import-products-batch", { products: batchToQueue }, {
                removeOnComplete: true,
                attempts: 3,
                backoff: { type: "exponential", delay: 2000 }
              });
            }
          }
        } catch (parseError) {
          console.warn("Chyba parsovania XML bloku:", parseError);
        }
        lastIndex = match.index + match[0].length;
      }
    }
    leftoverData = data.substring(lastIndex);
  });

  reader.on("end", async () => {
    // Odoslanie zvyšných produktov z bufferu
    if (productBuffer.length > 0) {
      await productQueue.add("import-products-batch", { products: productBuffer }, {
        removeOnComplete: true,
        attempts: 3,
      });
    }
    console.log("XML stream úspešne dočítaný a rozdelený do fronty.");
  });

  await streamPipeline(reader, new Readable({ read() {} }));
}

3.2. Asynchrónny BullMQ worker a PostgreSQL Drizzle ORM zápis

Worker beží ako samostatná mikro-služba na pozadí, prijíma dávky z fronty a vykonáva optimalizovaný, atomický hromadný UPSERT v PostgreSQL, aby sme predišli zbytočným individuálnym zápisom a preťaženiu databázy.

// src/workers/import.worker.ts
import { Worker, Job } from "bullmq";
import IORedis from "ioredis";
import { db } from "@/db/drizzle"; // Drizzle DB klient
import { products } from "@/db/schema"; // Drizzle schema
import { sql } from "drizzle-orm";
import { ValidatedProduct } from "../services/feed-processor.service";

const redisConnection = new IORedis(process.env.REDIS_URL || "redis://127.0.0.1:6379");

/**
 * Hromadný zápis produktov do databázy pomocou Drizzle ORM UPSERT syntaxe
 */
async function processBatchToDatabase(productsBatch: ValidatedProduct[]) {
  // Optimalizovaný hromadný zápis pre minimalizáciu I/O operácií
  await db.insert(products).values(
    productsBatch.map((p) => ({
      sku: p.sku,
      name: p.name,
      price: sql`${p.price}::numeric`,
      stock: p.stock,
      ean: p.ean || null,
      updatedAt: new Date(),
    }))
  ).onConflictDoUpdate({
    target: products.sku,
    set: {
      name: sql`EXCLUDED.name`,
      price: sql`EXCLUDED.price`,
      stock: sql`EXCLUDED.stock`,
      ean: sql`EXCLUDED.ean`,
      updatedAt: new Date(),
    }
  });
}

// Inicializácia asynchrónneho workera so striktným riadením paralelných úloh
const importWorker = new Worker(
  "product-import-queue",
  async (job: Job<{ products: ValidatedProduct[] }>) => {
    const { products } = job.data;
    
    if (!products || products.length === 0) {
      return;
    }

    try {
      await processBatchToDatabase(products);
      console.log(`Úspešne spracovaný balík produktov. Job ID: ${job.id}, veľkosť dávky: ${products.length}`);
    } catch (dbError: any) {
      console.error(`Databázová chyba pri zápise dávky (Job: ${job.id}):`, dbError.message);
      // Chybu vyhodíme, aby BullMQ označilo job ako zlyhaný a aktivovalo retry mechanizmus
      throw dbError;
    }
  },
  {
    connection: redisConnection,
    concurrency: 5, // Maximálne 5 paralelných databázových zápisov na jedného workera
    limiter: {
      max: 100, // Zamedzenie zahlteniu databázy (Rate limiting)
      duration: 1000,
    }
  }
);

importWorker.on("failed", (job, err) => {
  console.error(`Job zlyhal po všetkých pokusoch. ID: ${job?.id}. Dôvod: ${err.message}`);
  // Tu môžeme implementovať logovanie do monitorovacieho nástroja alebo odoslať notifikáciu
});

4. Riešenie hraničných stavov a zlyhaní pri sieťových importoch

Senior vývojári vedia, že integrácie tretích strán zlyhávajú pravidelne. Ak pipeline presne nerieši, čo sa má stať pri chybe, systém bude nestabilný.

Nižšie sú uvedené tri časté kritické scenáre a spôsoby, ako ich asynchrónne riešime v technologickom štúdiu nolimeo:

4.1. Server dodávateľa uprostred streamovania vráti 504 Gateway Timeout alebo preruší spojenie

XML feedy sú generované dynamicky na strane dodávateľa. Ak dodávateľ nestíha, spojenie sa preruší v polovici a parser dostane nekompletný, poškodený XML kód.

  • Riešenie: Neprepisujeme ostré produkčné dáta priamo počas sťahovania. Namiesto toho najprv stiahneme XML feed do dočasného úložiska na serveri (/tmp/import_feed.xml). Až po úspešnom stiahnutí bez sieťových chýb spustíme proces parsovania z lokálneho disku. Tým výrazne znižujeme riziko, že neúplný prenos poškodí databázu alebo vymaže produkty, ktoré v pretrhnutom feede chýbali.

4.2. Nevalidné XML dáta a chybné dátové typy (Broken Feed)

Dodávateľ zmení formát cien (napr. namiesto desatinnej bodky použije čiarku) alebo pošle nevalidnú XML syntax, ktorá spôsobí pád parsera.

  • Riešenie: Dead Letter Queues (DLQ). BullMQ má vstavaný mechanizmus automatických pokusov s exponenciálnym spätným čakaním (exponential backoff). Ak balík zlyhá pre preťaženú databázu, worker to vyskúša znova podľa nastavenej retry stratégie. Ak však balík zlyhá pre nevalidnú štruktúru dát, napríklad po chybe validácie v Zod, job sa neopakuje donekonečna, ale presunie sa do oddelenej fronty (Dead Letter Queue), kde naň upozorní monitoring. Validné produkty z ostatných dávok môžu pokračovať v importe bez prerušenia celého procesu.

4.3. Uzamykanie databázy (Database Row/Table Locking)

Ak spustíte príliš veľa paralelných workerov, ktoré sa snažia urobiť UPSERT do tej istej tabuľky produktov, databázový stroj PostgreSQL začne hlásiť výnimky typu Deadlock detected. Transakcie sa navzájom zablokujú pri čakaní na zámky riadkov.

  • Riešenie: Riadená paralelnosť (concurrency limit). V konfigurácii BullMQ workera nastavujeme parameter concurrency: 5 alebo nižší podľa hardvéru databázy a reálneho zaťaženia. Aj keď máme k dispozícii dostatočný výpočtový výkon pre beh väčšieho počtu procesov, asynchrónny „škrtiaci ventil“ pomáha dávkovať databázové zápisy tak, aby zbytočne nezamykali tabuľky a nespomaľovali používateľské prostredie e-shopu.

5. Prečo sa vyhnúť nestabilným no-code platformám a šablónam

V mnohých firmách sa stretávame s pokusmi o automatizáciu týchto náročných procesov pomocou klikacích riešení ako Zapier alebo Make.com. Pre menšie e-shopy to môže fungovať, no pre stredné a väčšie B2B platformy je to technologická slepá ulička.

  • Vysoké náklady pri väčších objemoch: Pri sťahovaní desiatok tisíc produktov denne spotrebujete veľké množstvo operácií. Poplatky za no-code platformy môžu rýchlo narásť na úroveň, ktorá presahuje náklady na vlastnú spracovateľskú infraštruktúru.
  • Absencia spätného tlaku (no backpressure): Tieto platformy často nepracujú so spätným tlakom tak, ako ho potrebujete pri veľkých streamoch. Keď im pošlete veľký súbor, môžu zlyhať na limite pamäte alebo zahltiť cieľové API priveľkým počtom paralelných požiadaviek.
  • Chýbajúca kontrola nad chybami: Pri zlyhaní uprostred prenosu často nemáte možnosť urobiť bezpečný rollback transakcie alebo izolovať chybné riadky. Výsledkom môžu byť nekonzistentné stavy, keď časť skladu ukazuje staré ceny a časť nové.

V štúdiu nolimeo staviame na type-safe riešeniach s vlastným kódom, ktoré vaša firma vlastní a vie ich technicky aj zmluvne kontrolovať. Pri väčších objemoch dát tak nezávisíte od limitov klikacej platformy a viete presne riadiť každý dôležitý dátový tok.


Záver: Získajte kontrolu nad vašimi dátovými tokmi

Spracovanie XML feedov nie je o tom, ako rýchlo dokážete prečítať súbor. Ide o to, ako bezpečne a predvídateľne dostať dáta zo servera dodávateľa do vašej databázy bez toho, aby import zbytočne spomaľoval web pre zákazníkov.

Ak váš súčasný e-shop trpí nočnými výpadkami počas synchronizácie, ak vývojári neustále lepia chyby v starých cron skriptoch, alebo ak platíte neúmerne vysoké poplatky za nestabilné no-code platformy, je čas prejsť na modernú asynchrónnu architektúru.

Sme technologické štúdio nolimeo, špecializovaný tím senior vývojárov. Klienti u nás komunikujú priamo s ľuďmi, ktorí architektúru navrhujú aj implementujú. Zameriavame sa na spoľahlivý, výkonný kód, rýchle reakčné časy pri správne navrhnutej infraštruktúre a stabilné prepojenia na ERP systémy, ktoré firme pomáhajú znižovať technologický dlh.

Napíšte nám a prejdeme si dodávateľské feedy, ERP integrácie, dátové objemy aj bezpečný technický smer pre asynchrónne spracovanie importov.

Máte záujem posunúť váš projekt vpred?