Previously I've been able to crawl and index web pages for my search engine without a problem, until my database grew more than what RabbitMQ's message queue was capable of holding. If a message in a message queue exceeds its default size, RabbitMQ will throw an error and panic, I could change the default size but that would not scale if my database grows, so in order for users to crawl web pages without having to worry about the message broker crashing.
Creating Segments
I've implemented a function to create segments with a maximum segment size or MSS from the same principles from TCP when creating segments, the segment contains an 8 byte header where each 4 byte of the 8 byte header is the sequence number and the total segment count, and the rest of the body is the payload of the segmented database.
// MSS is number in bytes function createSegments( webpages: Array<Webpage>, // webpages queried from database MSS: number, ): Array<ArrayBufferLike> { const text_encoder = new TextEncoder(); const encoded_text = text_encoder.encode(JSON.stringify(webpages)); const data_length = encoded_text.byteLength; let currentIndex = 0; let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder let segments: Array<ArrayBufferLike> = []; let pointerPosition = MSS; for (let i = 0; i < segmentCount; i++) { let currentDataLength = Math.abs(currentIndex - data_length); let slicedArray = encoded_text.slice(currentIndex, pointerPosition); currentIndex += slicedArray.byteLength; // Add to offset MSS to point to the next segment in the array // manipulate pointerPosition to adjust to lower values using Math.min() // Is current data length enough to fit MSS? // if so add from current position + MSS // else get remaining of the currentDataLength pointerPosition += Math.min(MSS, currentDataLength); const payload = new Uint8Array(slicedArray.length); payload.set(slicedArray); segments.push(newSegment(i, segmentCount, Buffer.from(payload))); } return segments; } function newSegment( sequenceNum: number, segmentCount: number, payload: Buffer, ): ArrayBufferLike { // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount const sequenceNumBuffer = convertIntToBuffer(sequenceNum); const segmentCountBuffer = convertIntToBuffer(segmentCount); const headerBuffer = new ArrayBuffer(8); const header = new Uint8Array(headerBuffer); header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer])); return Buffer.concat([header, payload]); } function convertIntToBuffer(int: number): Buffer { const bytes = Buffer.alloc(4); bytes.writeIntLE(int, 0, 4); console.log(bytes); return bytes; }
Parsing incoming segments
This method of creating small segments of a large dataset would help scale the database query even if the database grows.
Now how does the search engine parse the buffer and transform each segments into a web page array?
Reading from segment buffers
First extract the segment header, since the header contains 2 properties namely Sequence number and Total Segments,
func GetSegmentHeader(buf []byte) (*SegmentHeader, error) { byteReader := bytes.NewBuffer(buf) headerOffsets := []int{0, 4} newSegmentHeader := SegmentHeader{} for i := range headerOffsets { buffer := make([]byte, 4) _, err := byteReader.Read(buffer) if err != nil { return &SegmentHeader{}, err } value := binary.LittleEndian.Uint32(buffer) // this feels disgusting but i dont feel like bothering with this if i == 0 { newSegmentHeader.SequenceNum = value continue } newSegmentHeader.TotalSegments = value } return &newSegmentHeader, nil } func GetSegmentPayload(buf []byte) ([]byte, error) { headerOffset := 8 byteReader := bytes.NewBuffer(buf[headerOffset:]) return byteReader.Bytes(), nil }
Handling retransmission and requeuing of segments
The sequence number will be used for retransmission/requeuing of the segments, so if the expected sequence number is not what was received then re-queue every segment starting from the current one.
// for retransmission/requeuing if segmentHeader.SequenceNum != expectedSequenceNum { ch.Nack(data.DeliveryTag, true, true) log.Printf("Expected Sequence number %d, got %d\n", expectedSequenceNum, segmentHeader.SequenceNum) continue }
Appending segment payloads
The total segment will be used for breaking out of listening to the producer (database service) if the total number of segments received by the search engine is equal to the length of the total segments that is to be sent by the database service then break out and parse the aggregated segment buffer, if not the keep listening and append the segment payload buffer to a web page buffer to hold bytes from all of the incoming segments.
segmentCounter++ fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments) fmt.Printf("current segments : %d\n", segmentCounter) expectedSequenceNum++ ch.Ack(data.DeliveryTag, false) webpageBytes = append(webpageBytes, segmentPayload...) fmt.Printf("Byte Length: %d\n", len(webpageBytes)) if segmentCounter == segmentHeader.TotalSegments { log.Printf("Got all segments from Database %d", segmentCounter) break }
I use vim btw
Thank you for coming to my ted talk, I will be implementing more features and fixes for zensearch.
The above is the detailed content of Scaling Zensearchs capabilities to query the whole database. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undress AI Tool
Undress images for free

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

In Go language, calling a structure method requires first defining the structure and the method that binds the receiver, and accessing it using a point number. After defining the structure Rectangle, the method can be declared through the value receiver or the pointer receiver; 1. Use the value receiver such as func(rRectangle)Area()int and directly call it through rect.Area(); 2. If you need to modify the structure, use the pointer receiver such as func(r*Rectangle)SetWidth(...), and Go will automatically handle the conversion of pointers and values; 3. When embedding the structure, the method of embedded structure will be improved, and it can be called directly through the outer structure; 4. Go does not need to force use getter/setter,

In Go, an interface is a type that defines behavior without specifying implementation. An interface consists of method signatures, and any type that implements these methods automatically satisfy the interface. For example, if you define a Speaker interface that contains the Speak() method, all types that implement the method can be considered Speaker. Interfaces are suitable for writing common functions, abstract implementation details, and using mock objects in testing. Defining an interface uses the interface keyword and lists method signatures, without explicitly declaring the type to implement the interface. Common use cases include logs, formatting, abstractions of different databases or services, and notification systems. For example, both Dog and Robot types can implement Speak methods and pass them to the same Anno

TointegrateGolangserviceswithexistingPythoninfrastructure,useRESTAPIsorgRPCforinter-servicecommunication,allowingGoandPythonappstointeractseamlesslythroughstandardizedprotocols.1.UseRESTAPIs(viaframeworkslikeGininGoandFlaskinPython)orgRPC(withProtoco

Go's time package provides functions for processing time and duration, including obtaining the current time, formatting date, calculating time difference, processing time zone, scheduling and sleeping operations. To get the current time, use time.Now() to get the Time structure, and you can extract specific time information through Year(), Month(), Day() and other methods; use Format("2006-01-0215:04:05") to format the time string; when calculating the time difference, use Sub() or Since() to obtain the Duration object, and then convert it into the corresponding unit through Seconds(), Minutes(), and Hours();

InGo,ifstatementsexecutecodebasedonconditions.1.Basicstructurerunsablockifaconditionistrue,e.g.,ifx>10{...}.2.Elseclausehandlesfalseconditions,e.g.,else{...}.3.Elseifchainsmultipleconditions,e.g.,elseifx==10{...}.4.Variableinitializationinsideif,l

Golangofferssuperiorperformance,nativeconcurrencyviagoroutines,andefficientresourceusage,makingitidealforhigh-traffic,low-latencyAPIs;2.Python,whileslowerduetointerpretationandtheGIL,provideseasierdevelopment,arichecosystem,andisbettersuitedforI/O-bo

The standard way to protect critical areas in Go is to use the Lock() and Unlock() methods of sync.Mutex. 1. Declare a mutex and use it with the data to be protected; 2. Call Lock() before entering the critical area to ensure that only one goroutine can access the shared resources; 3. Use deferUnlock() to ensure that the lock is always released to avoid deadlocks; 4. Try to shorten operations in the critical area to improve performance; 5. For scenarios where more reads and less writes, sync.RWMutex should be used, read operations through RLock()/RUnlock(), and write operations through Lock()/Unlock() to improve concurrency efficiency.

Gohandlesconcurrencyusinggoroutinesandchannels.1.GoroutinesarelightweightfunctionsmanagedbytheGoruntime,enablingthousandstorunconcurrentlywithminimalresourceuse.2.Channelsprovidesafecommunicationbetweengoroutines,allowingvaluestobesentandreceivedinas
