[NestJS] ระบบ Queue แบบ Request-Reply ง่ายๆด้วย NestJS

Image placeholder
แวะมาทักทายกันได้


สมมติว่า
มีข้อมูลที่ต้องส่งไปให้ Server ก้อนใหญ่สักก้อนนึง เช่น 100,000 ข้อความ (ลองคิดดูว่า ถ้าต้องส่งข้อมูล จำนวน 100,000 ข้อความไปให้ Serverในครั้งเดียว Server ก็คงทำงานไม่ทันแน่ๆ) หรือ Server ถูกกำหนดเอาไว้ว่าจะรับ Message เพียงแค่ 1,000 ข้อความนั้นนะ เพราะทำงานไม่ไหวแน่ๆ

จากโจทย์แบบนี้ เราจะทำอย่างไรได้บ้างในบทความนี้จะนำ NestJs มาแก้ปัญหากัน 

วิธีแก้ปัญหาอย่างง่ายๆ และได้ทดลองมา เป็นวิธีที่เรียกว่า  Request / Reply Model เดี๋ยวจะกล่าวในหัวข้อถัดไป 

วัตถุประสงค์ในเนื้อหานี้ จะเป็นการนำวิธี Implement และ Config ระบบ Queue  ด้วย NestJS มาทดลองการใช้งาน ให้ได้ตามโจทย์และความต้องการของผู้เขียน ซึ่งก่อนอื่น คงต้องเล่าเนื้อหาที่เกี่ยวข้องบางส่วนก่อน 

ฝากกดโฆษณา Google Ads สัก click  เพื่อเป็นกำลังใจแก่ผู้เขียนด้วยนะครับ

Message Queue Concept

โดยหลักการพื้นฐานในเรื่องของ รับ-ส่งข้อความในระบบ Queue (Messaging Queue) จะมีส่วนประกอบ คือ


Queue หรือ Channel - ช่องทางหรือตัวเชื่อมระหว่าง ผู้ส่ง (Producer, Sender) และ ผู้รับ (Consumer, Receiver)  

Producer หรือ Sender - ทำหน้าที่ในการส่งข้อความไปยัง Message Queue

Consumer หรือ Receiver ทำหน้าที่รับข้อความจาก Message Queue ซึ่งสามารถรับได้แบบ ถูกเลือกให้รับ หรือ เลือกที่จะรับก็ได้ 



เมื่อมีการส่งข้อความไปยัง Message Queue ตามภาพด้านบน ข้อความจะถูกจัดเก็บเอาไว้ชั่วคราวเพื่อรอการส่งต่อไปยัง Consumer 

ในรูปแบบการส่งในเนื้อหาบทความนี้ จะอยู่ในเรื่องของ Point-To-Point (PTP) จากการสรุปของพี่สมเกียรติ ตามด้านล่างนั้น ค่อนข้างที่จะตรงกับการ Implement ในบทความนี้จึงยกมาอ้างอิงตามด้านล่าง

"Message จะถูก consume จาก consumer เดียวเท่านั้น ถึงแม้จะมี consumer มากกว่า 1 ตัวก็ตาม ทำให้เรามั่นใจได้ว่า จะไม่มี consumer มารับ message ไปทำงานซ้ำแน่นอน การส่งข้อมูลจะตัวกลางซึ่งเราจะเรียกว่า queue (FIFO – First In First Out)" 

"Request/Reply model คือเมื่อส่งข้อมูลเข้า queue แล้วจะต้องรอการตอบกลับจาก consomer ด้วยว่าเป็นอย่างไร นั่นทำให้ producer และ consumer ผูกมัดกัน หรืออาจจะเรียกได้ว่า ทำงานแบบ Synchronous"


เมื่อเข้าใจเนื้อหาเบื้องต้นที่พูดถึงในบทความนี้กันแล้วจะมาพูด ข้อดีที่จะช่วยให้เข้าใจว่าทำไมจึงต้องใช้ระบบคิว มันช่วยอะไรคุณได้บ้าง เป็นยกข้อดีของการใช้ระบบคิวมาเนื้อหาของ Queue NestJs

  • สามารถเพิ่มงานเข้าไปในระบบคิว เพื่อทำงานพร้อมๆกันได้ และยังเพิ่มคิวใหม่ได้ตลอดเวลา
  • มีฟังก์ชันสร้างความน่าเชื่อถือของระบบ ตัวอย่างเช่น มี listening ค่อยฟัง Event ต่างๆ ทั้ง Complete, Error, Fail ให้สามารถจัดการ Task (Job) ต่างๆ ให้ใช้งานได้อย่างน่าเชื่อถือ


NestJs จะใช้ Bull Library ในการทำระบบ Queue Management ซึ่งจะใช้ Redis เป็น Persist Job ให้เก็บข้อความ หรือ ทำ Message Queue งาน ดัง flow ตามภาพด้านล่าง Producer จะส่งข้อความไปให้ Redis ที่ทำหน้าที่เป็น Message Queue แล้วจึงค่อยส่งข้อความไปยัง Consumer เพื่อ Process ต่อไป 



จาก Flow เราก็จะมา Code Producer และ Consumer กัน


ติดตั้ง Nest Queue ใน Project


$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull


Config เรียกใช้  Module ที่ไฟล์ app.module.ts

ตรง forRoot method จะเป็นการ Register bull package ให้เข้าใน application และสามารถ config Properties ในการใช้งานได้ 


นี้คือตัวอย่างที่ผู้เขียนตั้งค่าเอาไว้ Module อธิบายได้ดังนี้
limiter คือการกำหนด Job ให้ทำงานทีละเท่าไร หลังจากที่ Queue Complete สมมติว่าผู้เขียนอยากให้ queue แต่ละ queue ทำงานให้เสร็จทีละ queue ก่อนที่จะทำ queue ถัดไปก็กำหนดให้ max เท่ากับ 1 หรือถ้าต้องการกำหนดค่าอื่นๆ queue ก็จะทำงานพร้อมๆกันตามจำนวนที่กำหนด

ในส่วนของ backoffStrategies

ใช้ในกรณีที่ queue fail โดยกำหนดชื่อ และ callback



Building Producer 

ในส่วนของ Producer ก็เหมือนกับเป็น Controller ตัวหนึ่งหากใครเคยเขียน Rest Api ด้วย Nest มาบ้าง ก็จะคล้ายๆกัน โดยที่เราสามารถที่จะกำหนด Method POST, GET เข้ามา โดยที่เราจะต้อง Injectable Queue Package เอาไว้ที่ controller 

constructor(@InjectQueue('audio') private readonly audioQueue: Queue) { }


จำลองว่ามี Message เข้ามาในระบบจำนวน 38 Message ผู้เขียนสร้างเอาไว้เป็น Array

let array1 = [
"message1", "message2", "message3",
"message4", "message5", "message6", "message7", "message8",
"message9", "message10", "message11", "message12", "message13",
"message14", "message15", "message16",
"message17", "message18", "message19", "message20", "message21",
"message22", "message23", "message24",
"message25", "message26", "message27", "message28", "message29",
"message30", "message31", "message32",
"message33", "message34", "message35", "message36", "message37",
"message38"
];


สมมติว่าจากโจทย์ที่ตั้งเอาไว้ตอนต้นว่า กรณีที่ Server กำหนดให้ส่ง Message มาได้แค่ 10 Message เราจะต้องทำอย่างไร วิธีการคือ ทำ Chunk ให้กับชุด Message  โดยส่งชุด Array เข้า function นี้ แล้วกำหนด ChunkSize ว่าในแต่ละชุดจะให้มีอย่างละกี่ Array

function chunkArray(array, chunkSize) {
return Array.from(
{ length: Math.ceil(array.length / chunkSize) },
(_, index) => array.slice(index * chunkSize, (index + 1) * chunkSize)
);
}


จากนั้น ก็เรียก Method Add() ของ Queue Instance  this.audioQueue.add () โดย Argument Parameter จะมี add('ชื่อของ process', Payload , JobOption)  ในตัวอย่างด้านล่างจะกำหนด ชื่อของ Process ไว้ว่า transcode ชื่อตรงนี้จะต้องนำไป ไว้ที่ Consumer ด้วยเพื่อรับ Message ในส่วนของ Payload ก็จะเป็น Function ในการแบ่ง Array ทั้งนี้เพื่อ add เข้าไปใน Queue และสุดท้ายจะเป็น JobOption ผู้เขียนได้กำหนด Setting Job นี้ไว้ว่า attempts : 3 หมายถึง จะทำ Re-Queue ของ JobId เดิม 3 ครั้ง กรณีที่ Queue Fail เราสามารถ กำหนด Fail Job ได้จาก Consumer ส่วน backoff คือ ค่าหน่วงเวลา ของแต่ละครั้งที่ re-queue จะเป็นการกำหนดด้วย Callback ชื่อ jitter สามารถตั้งชื่ออะไรก็ได้ เอาไว้ที่ Module ที่ผู้เขียนเอาไว้ในส่วนของ Module

for (let i = 0; i < chunkArray(array1, chunkSize).length; i++) {

this.audioQueue.add(
'transcode',
chunkArray(array1, chunkSize)[i],
{ // setting JOB
attempts: 3,
backoff: {
type: 'jitter',
}
}

);

}


import { CreateTaskDTO } from './dto/create-task.dto';
import { InjectQueue, OnGlobalQueueCompleted } from '@nestjs/bull';
import { Body, Controller, Post, Get, Injectable } from '@nestjs/common';
import { Job, Queue } from 'bull';
import { job } from 'cron';

@Controller('audio')
export class AudioController {
constructor(@InjectQueue('audio') private readonly audioQueue: Queue) { }

@Get('test')
async transcode() { // @Body() createTaskDTO: CreateTaskDTO

let array1 = [
"message1", "message2", "message3", "message4", "message5",
        "message6", "message7", "message8", "message9", "message10",
        "message11", "message12", "message13",
        "message14", "message15", "message16",
        "message17", "message18", "message19",
        "message20", "message21",
        "message22", "message23", "message24",
        "message25", "message26", "message27",
        "message28", "message29",
        "message30", "message31", "message32",
        "message33", "message34", "message35",
        "message36", "message37", "message38" ];

let chunkSize = 5;

for (let i = 0; i < chunkArray(array1, chunkSize).length; i++) {

this.audioQueue.add(
'transcode',
chunkArray(array1, chunkSize)[i],
{ // setting JOB
attempts: 3,
backoff: {
type: 'jitter',
}
}

);

}

let waiting = await this.audioQueue.getWaiting();
waiting.forEach(function (message) {
console.log("waiting_id is " + message.id);
console.log("waiting_data is " + message.data);
});

function chunkArray(array, chunkSize) {
return Array.from(
{ length: Math.ceil(array.length / chunkSize) },
(_, index) => array.slice(index * chunkSize, (index + 1) * chunkSize)
);
}

}

}


Building Consumer

ในส่วนของ processorจะต้องกำหนด audio ซึ่งจะตรงกับ  @InjectQueue('audio') ใน producer

และต้องกำหนด transcode เหมือนเป็น channel ที่รับจาก producer เช่นกัน จากนั้นก็นำ job ที่ได้จากการ callback มา handle message ว่าจะเอาไปทำอะไรต่อนั้นเอง

@Injectable()
@Processor('audio')
export class AudioProcessor {
constructor(private appService: AppService) { }
@Process('transcode')
async transcode(job: Job<unknown>) {
await this.appService.listCall()
.then(data => {
job.moveToCompleted('done', true);
// RUN PROCESS
this.logger.debug('jobId already Run On Process : ' + job.id);
})
.catch(err => {
job.moveToFailed({ message: 'fail' });
});
}

}


ถ้า อยากทำ Event Listening ด้วยก็สามารถทำได้ โดยเติมเพื่อ  loging หรือ event handle 

@OnQueueWaiting()
onWaiting(jobId: string) {
this.logger.debug(`OnQueueWaiting : ${jobId}`);
}


คราวนี้ก็มาดูผลลัพท์กัน (อย่าลืมติดตั้ง Redis Server แล้ว start มันด้วยนะ)



จะสังเกตุได้ว่า มีทั้ง queue ที่รออยู่และ queue ที่ active แล้ว


จริงๆแล้ว NestJs ยังสามารถ Handle Queue ได้หลากหลาย สามารถที่จะดูจาก document 

REF:

https://docs.nestjs.com/techniques/queues

https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue

https://github.com/OptimalBits/bull/blob/master/PATTERNS.md#custom-backoff-strategy


แวะมาทักทายกันได้
donate

Categories: Tutorial Tags: #nestjs , 6039