From 0455504e22639e9c475447034b93f5161c1327b4 Mon Sep 17 00:00:00 2001 From: regevbr Date: Thu, 18 Jun 2020 14:49:50 +0300 Subject: [PATCH 01/11] fix: major performance issues with bytea performance #2240 --- packages/pg-protocol/src/parser.ts | 44 ++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index 4044dae1c..61f765fa8 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -75,6 +75,8 @@ export type MessageCallback = (msg: BackendMessage) => void export class Parser { private remainingBuffer: Buffer = emptyBuffer + private remainingBufferLength: number = 0 + private remainingBufferOffset: number = 0 private reader = new BufferReader() private mode: Mode @@ -87,13 +89,33 @@ export class Parser { public parse(buffer: Buffer, callback: MessageCallback) { let combinedBuffer = buffer - if (this.remainingBuffer.byteLength) { - combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength) - this.remainingBuffer.copy(combinedBuffer) - buffer.copy(combinedBuffer, this.remainingBuffer.byteLength) + let combinedBufferOffset = 0 + let combinedBufferLength = buffer.byteLength + const newRealLength = this.remainingBufferLength + combinedBufferLength + if (this.remainingBufferLength) { + const newLength = newRealLength + this.remainingBufferOffset + if (newLength > this.remainingBuffer.byteLength) { + let newBufferLength = this.remainingBufferLength * 2 + while (newRealLength >= newBufferLength) { + newBufferLength *= 2 + } + const newBuffer = Buffer.allocUnsafe(newBufferLength) + this.remainingBuffer.copy( + newBuffer, + 0, + this.remainingBufferOffset, + this.remainingBufferOffset + this.remainingBufferLength + ) + this.remainingBuffer = newBuffer + this.remainingBufferOffset = 0 + } + buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength) + combinedBuffer = this.remainingBuffer + combinedBufferLength = newRealLength + combinedBufferOffset = this.remainingBufferOffset } - let offset = 0 - while (offset + HEADER_LENGTH <= combinedBuffer.byteLength) { + let offset = combinedBufferOffset + while (offset + HEADER_LENGTH <= combinedBufferLength) { // code is 1 byte long - it identifies the message type const code = combinedBuffer[offset] @@ -102,7 +124,7 @@ export class Parser { const fullMessageLength = CODE_LENGTH + length - if (fullMessageLength + offset <= combinedBuffer.byteLength) { + if (fullMessageLength + offset <= combinedBufferLength) { const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) callback(message) offset += fullMessageLength @@ -111,10 +133,14 @@ export class Parser { } } - if (offset === combinedBuffer.byteLength) { + if (offset === combinedBufferLength) { this.remainingBuffer = emptyBuffer + this.remainingBufferLength = 0 + this.remainingBufferOffset = 0 } else { - this.remainingBuffer = combinedBuffer.slice(offset) + this.remainingBuffer = combinedBuffer + this.remainingBufferLength = combinedBufferLength - offset + this.remainingBufferOffset += offset } } From c31205f4373f9820697f06d8f8875e31c7c0877f Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 19 Jun 2020 02:32:00 +0300 Subject: [PATCH 02/11] fix: major performance issues with bytea performance #2240 --- packages/pg-protocol/src/parser.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index 61f765fa8..657514dde 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -91,11 +91,12 @@ export class Parser { let combinedBuffer = buffer let combinedBufferOffset = 0 let combinedBufferLength = buffer.byteLength - const newRealLength = this.remainingBufferLength + combinedBufferLength - if (this.remainingBufferLength) { + let remainingBufferNotEmpty = this.remainingBufferLength > 0 + if (remainingBufferNotEmpty) { + const newRealLength = this.remainingBufferLength + combinedBufferLength const newLength = newRealLength + this.remainingBufferOffset if (newLength > this.remainingBuffer.byteLength) { - let newBufferLength = this.remainingBufferLength * 2 + let newBufferLength = this.remainingBuffer.byteLength * 2 while (newRealLength >= newBufferLength) { newBufferLength *= 2 } @@ -111,11 +112,12 @@ export class Parser { } buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength) combinedBuffer = this.remainingBuffer - combinedBufferLength = newRealLength + combinedBufferLength = this.remainingBufferLength = newRealLength combinedBufferOffset = this.remainingBufferOffset } + const realLength = combinedBufferOffset + combinedBufferLength let offset = combinedBufferOffset - while (offset + HEADER_LENGTH <= combinedBufferLength) { + while (offset + HEADER_LENGTH <= realLength) { // code is 1 byte long - it identifies the message type const code = combinedBuffer[offset] @@ -124,7 +126,7 @@ export class Parser { const fullMessageLength = CODE_LENGTH + length - if (fullMessageLength + offset <= combinedBufferLength) { + if (fullMessageLength + offset <= realLength) { const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) callback(message) offset += fullMessageLength @@ -133,12 +135,12 @@ export class Parser { } } - if (offset === combinedBufferLength) { + if (offset === realLength) { this.remainingBuffer = emptyBuffer this.remainingBufferLength = 0 this.remainingBufferOffset = 0 } else { - this.remainingBuffer = combinedBuffer + this.remainingBuffer = remainingBufferNotEmpty ? combinedBuffer : combinedBuffer.slice() this.remainingBufferLength = combinedBufferLength - offset this.remainingBufferOffset += offset } From 13ff0e11ed0c93eebe40a55296660247866e7b94 Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 19 Jun 2020 02:53:17 +0300 Subject: [PATCH 03/11] fix: major performance issues with bytea performance #2240 --- packages/pg-protocol/src/parser.ts | 35 +++++++++++++++++------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index 657514dde..63303ac83 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -89,15 +89,15 @@ export class Parser { public parse(buffer: Buffer, callback: MessageCallback) { let combinedBuffer = buffer - let combinedBufferOffset = 0 let combinedBufferLength = buffer.byteLength - let remainingBufferNotEmpty = this.remainingBufferLength > 0 - if (remainingBufferNotEmpty) { - const newRealLength = this.remainingBufferLength + combinedBufferLength - const newLength = newRealLength + this.remainingBufferOffset - if (newLength > this.remainingBuffer.byteLength) { + let combinedBufferOffset = 0 + let reuseRemainingBuffer = this.remainingBufferLength > 0 + if (reuseRemainingBuffer) { + const newLength = this.remainingBufferLength + combinedBufferLength + const newFullLength = newLength + this.remainingBufferOffset + if (newFullLength > this.remainingBuffer.byteLength) { let newBufferLength = this.remainingBuffer.byteLength * 2 - while (newRealLength >= newBufferLength) { + while (newLength >= newBufferLength) { newBufferLength *= 2 } const newBuffer = Buffer.allocUnsafe(newBufferLength) @@ -112,12 +112,12 @@ export class Parser { } buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength) combinedBuffer = this.remainingBuffer - combinedBufferLength = this.remainingBufferLength = newRealLength + combinedBufferLength = this.remainingBufferLength = newLength combinedBufferOffset = this.remainingBufferOffset } - const realLength = combinedBufferOffset + combinedBufferLength + const fullLength = combinedBufferOffset + combinedBufferLength let offset = combinedBufferOffset - while (offset + HEADER_LENGTH <= realLength) { + while (offset + HEADER_LENGTH <= fullLength) { // code is 1 byte long - it identifies the message type const code = combinedBuffer[offset] @@ -126,7 +126,7 @@ export class Parser { const fullMessageLength = CODE_LENGTH + length - if (fullMessageLength + offset <= realLength) { + if (fullMessageLength + offset <= fullLength) { const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) callback(message) offset += fullMessageLength @@ -135,14 +135,19 @@ export class Parser { } } - if (offset === realLength) { + if (offset === fullLength) { this.remainingBuffer = emptyBuffer this.remainingBufferLength = 0 this.remainingBufferOffset = 0 } else { - this.remainingBuffer = remainingBufferNotEmpty ? combinedBuffer : combinedBuffer.slice() - this.remainingBufferLength = combinedBufferLength - offset - this.remainingBufferOffset += offset + if (reuseRemainingBuffer) { + this.remainingBufferLength = combinedBufferLength - offset + this.remainingBufferOffset += offset + } else { + this.remainingBuffer = combinedBuffer.slice(offset) + this.remainingBufferLength = this.remainingBuffer.byteLength + this.remainingBufferOffset = 0 + } } } From 316b119e63f50b60f540f1390d36f341317ae01a Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 19 Jun 2020 03:27:39 +0300 Subject: [PATCH 04/11] fix: major performance issues with bytea performance #2240 --- packages/pg-protocol/src/parser.ts | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index 63303ac83..eabb1e3d7 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -96,11 +96,20 @@ export class Parser { const newLength = this.remainingBufferLength + combinedBufferLength const newFullLength = newLength + this.remainingBufferOffset if (newFullLength > this.remainingBuffer.byteLength) { - let newBufferLength = this.remainingBuffer.byteLength * 2 - while (newLength >= newBufferLength) { - newBufferLength *= 2 + // We can't concat the new buffer with the remaining one + let newBuffer: Buffer + if (newLength <= this.remainingBuffer.byteLength && this.remainingBufferOffset >= this.remainingBufferLength) { + // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer + newBuffer = this.remainingBuffer + } else { + // Allocate a new larger buffer + let newBufferLength = this.remainingBuffer.byteLength * 2 + while (newLength >= newBufferLength) { + newBufferLength *= 2 + } + newBuffer = Buffer.allocUnsafe(newBufferLength) } - const newBuffer = Buffer.allocUnsafe(newBufferLength) + // Move the remaining buffer to the new one this.remainingBuffer.copy( newBuffer, 0, @@ -110,6 +119,7 @@ export class Parser { this.remainingBuffer = newBuffer this.remainingBufferOffset = 0 } + // Concat the new buffer with the remaining one buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength) combinedBuffer = this.remainingBuffer combinedBufferLength = this.remainingBufferLength = newLength @@ -134,16 +144,18 @@ export class Parser { break } } - if (offset === fullLength) { + // No more use for the buffer this.remainingBuffer = emptyBuffer this.remainingBufferLength = 0 this.remainingBufferOffset = 0 } else { if (reuseRemainingBuffer) { + // Adjust the cursors of remainingBuffer this.remainingBufferLength = combinedBufferLength - offset this.remainingBufferOffset += offset } else { + // To avoid side effects, copy the remaining part of the new buffer to remainingBuffer this.remainingBuffer = combinedBuffer.slice(offset) this.remainingBufferLength = this.remainingBuffer.byteLength this.remainingBufferOffset = 0 From 89758cee2f7306d1a3471fe9f64d86f5c25aa8b4 Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 19 Jun 2020 03:39:06 +0300 Subject: [PATCH 05/11] fix: major performance issues with bytea performance #2240 --- packages/pg-protocol/src/parser.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index eabb1e3d7..56670fd75 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -150,14 +150,14 @@ export class Parser { this.remainingBufferLength = 0 this.remainingBufferOffset = 0 } else { + this.remainingBufferLength = fullLength - offset if (reuseRemainingBuffer) { // Adjust the cursors of remainingBuffer - this.remainingBufferLength = combinedBufferLength - offset - this.remainingBufferOffset += offset + this.remainingBufferOffset = offset } else { - // To avoid side effects, copy the remaining part of the new buffer to remainingBuffer - this.remainingBuffer = combinedBuffer.slice(offset) - this.remainingBufferLength = this.remainingBuffer.byteLength + // To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer + this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2) + combinedBuffer.copy(this.remainingBuffer, 0, offset) this.remainingBufferOffset = 0 } } From 5e0d684446e044d3c3d979fd09bb3247acbc006f Mon Sep 17 00:00:00 2001 From: regevbr Date: Sat, 20 Jun 2020 10:44:28 +0300 Subject: [PATCH 06/11] fix: major performance issues with bytea performance #2240 --- packages/pg-protocol/src/parser.ts | 85 ++++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 22 deletions(-) diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index 56670fd75..1827c3d1f 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -73,6 +73,14 @@ const enum MessageCodes { export type MessageCallback = (msg: BackendMessage) => void +interface CombinedBuffer { + combinedBuffer: Buffer + combinedBufferOffset: number + combinedBufferLength: number + combinedBufferFullLength: number + reuseRemainingBuffer: boolean +} + export class Parser { private remainingBuffer: Buffer = emptyBuffer private remainingBufferLength: number = 0 @@ -88,6 +96,41 @@ export class Parser { } public parse(buffer: Buffer, callback: MessageCallback) { + const { + combinedBuffer, + combinedBufferOffset, + combinedBufferLength, + reuseRemainingBuffer, + combinedBufferFullLength, + } = this.mergeBuffer(buffer) + let offset = combinedBufferOffset + while (offset + HEADER_LENGTH <= combinedBufferFullLength) { + // code is 1 byte long - it identifies the message type + const code = combinedBuffer[offset] + + // length is 1 Uint32BE - it is the length of the message EXCLUDING the code + const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH) + + const fullMessageLength = CODE_LENGTH + length + + if (fullMessageLength + offset <= combinedBufferFullLength) { + const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) + callback(message) + offset += fullMessageLength + } else { + break + } + } + this.consumeBuffer({ + combinedBuffer, + combinedBufferOffset: offset, + combinedBufferLength, + reuseRemainingBuffer, + combinedBufferFullLength, + }) + } + + private mergeBuffer(buffer: Buffer): CombinedBuffer { let combinedBuffer = buffer let combinedBufferLength = buffer.byteLength let combinedBufferOffset = 0 @@ -125,39 +168,37 @@ export class Parser { combinedBufferLength = this.remainingBufferLength = newLength combinedBufferOffset = this.remainingBufferOffset } - const fullLength = combinedBufferOffset + combinedBufferLength - let offset = combinedBufferOffset - while (offset + HEADER_LENGTH <= fullLength) { - // code is 1 byte long - it identifies the message type - const code = combinedBuffer[offset] - - // length is 1 Uint32BE - it is the length of the message EXCLUDING the code - const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH) - - const fullMessageLength = CODE_LENGTH + length - - if (fullMessageLength + offset <= fullLength) { - const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) - callback(message) - offset += fullMessageLength - } else { - break - } + const combinedBufferFullLength = combinedBufferOffset + combinedBufferLength + return { + combinedBuffer, + combinedBufferOffset, + combinedBufferLength, + reuseRemainingBuffer, + combinedBufferFullLength, } - if (offset === fullLength) { + } + + private consumeBuffer({ + combinedBufferOffset, + combinedBufferFullLength, + reuseRemainingBuffer, + combinedBuffer, + combinedBufferLength, + }: CombinedBuffer) { + if (combinedBufferOffset === combinedBufferFullLength) { // No more use for the buffer this.remainingBuffer = emptyBuffer this.remainingBufferLength = 0 this.remainingBufferOffset = 0 } else { - this.remainingBufferLength = fullLength - offset + this.remainingBufferLength = combinedBufferFullLength - combinedBufferOffset if (reuseRemainingBuffer) { // Adjust the cursors of remainingBuffer - this.remainingBufferOffset = offset + this.remainingBufferOffset = combinedBufferOffset } else { // To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2) - combinedBuffer.copy(this.remainingBuffer, 0, offset) + combinedBuffer.copy(this.remainingBuffer, 0, combinedBufferOffset) this.remainingBufferOffset = 0 } } From 64c78b0b0ef41d8da966c20a3b97eab74c1c3c60 Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 3 Jul 2020 17:52:26 +0300 Subject: [PATCH 07/11] fix: major performance issues with bytea performance #2240 --- package.json | 2 +- packages/pg-protocol/src/parser.ts | 128 +++++++++-------------------- packages/pg/bench.js | 20 ++++- 3 files changed, 55 insertions(+), 95 deletions(-) diff --git a/package.json b/package.json index 282ca9376..6ab9fa918 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,7 @@ "packages/*" ], "scripts": { - "test": "yarn lint && yarn lerna exec yarn test", + "test": "export PGDATABASE=data && export PGUSER=user && export PGPASSWORD=pass && yarn lint && yarn lerna exec yarn test", "build": "yarn lerna exec --scope pg-protocol yarn build", "pretest": "yarn build", "lint": "if [ -x ./node_modules/.bin/prettier ]; then eslint '*/**/*.{js,ts,tsx}'; fi;" diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index 1827c3d1f..a00dabec9 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -73,18 +73,10 @@ const enum MessageCodes { export type MessageCallback = (msg: BackendMessage) => void -interface CombinedBuffer { - combinedBuffer: Buffer - combinedBufferOffset: number - combinedBufferLength: number - combinedBufferFullLength: number - reuseRemainingBuffer: boolean -} - export class Parser { - private remainingBuffer: Buffer = emptyBuffer - private remainingBufferLength: number = 0 - private remainingBufferOffset: number = 0 + private buffer: Buffer = emptyBuffer + private bufferLength: number = 0 + private bufferOffset: number = 0 private reader = new BufferReader() private mode: Mode @@ -96,111 +88,65 @@ export class Parser { } public parse(buffer: Buffer, callback: MessageCallback) { - const { - combinedBuffer, - combinedBufferOffset, - combinedBufferLength, - reuseRemainingBuffer, - combinedBufferFullLength, - } = this.mergeBuffer(buffer) - let offset = combinedBufferOffset - while (offset + HEADER_LENGTH <= combinedBufferFullLength) { + this.mergeBuffer(buffer) + const bufferFullLength = this.bufferOffset + this.bufferLength + let offset = this.bufferOffset + while (offset + HEADER_LENGTH <= bufferFullLength) { // code is 1 byte long - it identifies the message type - const code = combinedBuffer[offset] - + const code = this.buffer[offset] // length is 1 Uint32BE - it is the length of the message EXCLUDING the code - const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH) - + const length = this.buffer.readUInt32BE(offset + CODE_LENGTH) const fullMessageLength = CODE_LENGTH + length - - if (fullMessageLength + offset <= combinedBufferFullLength) { - const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) + if (fullMessageLength + offset <= bufferFullLength) { + const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer) callback(message) offset += fullMessageLength } else { break } } - this.consumeBuffer({ - combinedBuffer, - combinedBufferOffset: offset, - combinedBufferLength, - reuseRemainingBuffer, - combinedBufferFullLength, - }) + if (offset === bufferFullLength) { + // No more use for the buffer + this.buffer = emptyBuffer + this.bufferLength = 0 + this.bufferOffset = 0 + } else { + // Adjust the cursors of remainingBuffer + this.bufferLength = bufferFullLength - offset + this.bufferOffset = offset + } } - private mergeBuffer(buffer: Buffer): CombinedBuffer { - let combinedBuffer = buffer - let combinedBufferLength = buffer.byteLength - let combinedBufferOffset = 0 - let reuseRemainingBuffer = this.remainingBufferLength > 0 - if (reuseRemainingBuffer) { - const newLength = this.remainingBufferLength + combinedBufferLength - const newFullLength = newLength + this.remainingBufferOffset - if (newFullLength > this.remainingBuffer.byteLength) { + private mergeBuffer(buffer: Buffer): void { + if (this.bufferLength > 0) { + const newLength = this.bufferLength + buffer.byteLength + const newFullLength = newLength + this.bufferOffset + if (newFullLength > this.buffer.byteLength) { // We can't concat the new buffer with the remaining one let newBuffer: Buffer - if (newLength <= this.remainingBuffer.byteLength && this.remainingBufferOffset >= this.remainingBufferLength) { + if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) { // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer - newBuffer = this.remainingBuffer + newBuffer = this.buffer } else { // Allocate a new larger buffer - let newBufferLength = this.remainingBuffer.byteLength * 2 + let newBufferLength = this.buffer.byteLength * 2 while (newLength >= newBufferLength) { newBufferLength *= 2 } newBuffer = Buffer.allocUnsafe(newBufferLength) } // Move the remaining buffer to the new one - this.remainingBuffer.copy( - newBuffer, - 0, - this.remainingBufferOffset, - this.remainingBufferOffset + this.remainingBufferLength - ) - this.remainingBuffer = newBuffer - this.remainingBufferOffset = 0 + this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength) + this.buffer = newBuffer + this.bufferOffset = 0 } // Concat the new buffer with the remaining one - buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength) - combinedBuffer = this.remainingBuffer - combinedBufferLength = this.remainingBufferLength = newLength - combinedBufferOffset = this.remainingBufferOffset - } - const combinedBufferFullLength = combinedBufferOffset + combinedBufferLength - return { - combinedBuffer, - combinedBufferOffset, - combinedBufferLength, - reuseRemainingBuffer, - combinedBufferFullLength, - } - } - - private consumeBuffer({ - combinedBufferOffset, - combinedBufferFullLength, - reuseRemainingBuffer, - combinedBuffer, - combinedBufferLength, - }: CombinedBuffer) { - if (combinedBufferOffset === combinedBufferFullLength) { - // No more use for the buffer - this.remainingBuffer = emptyBuffer - this.remainingBufferLength = 0 - this.remainingBufferOffset = 0 + buffer.copy(this.buffer, this.bufferOffset + this.bufferLength) + this.bufferLength = newLength } else { - this.remainingBufferLength = combinedBufferFullLength - combinedBufferOffset - if (reuseRemainingBuffer) { - // Adjust the cursors of remainingBuffer - this.remainingBufferOffset = combinedBufferOffset - } else { - // To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer - this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2) - combinedBuffer.copy(this.remainingBuffer, 0, combinedBufferOffset) - this.remainingBufferOffset = 0 - } + this.buffer = buffer + this.bufferOffset = 0 + this.bufferLength = buffer.byteLength } } diff --git a/packages/pg/bench.js b/packages/pg/bench.js index 80c07dc19..c861c3ae6 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -1,5 +1,4 @@ const pg = require('./lib') -const pool = new pg.Pool() const params = { text: @@ -17,7 +16,7 @@ const seq = { } const exec = async (client, q) => { - const result = await client.query({ + await client.query({ text: q.text, values: q.values, rowMode: 'array', @@ -40,6 +39,7 @@ const run = async () => { const client = new pg.Client() await client.connect() await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)') + await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)') await bench(client, params, 1000) console.log('warmup done') const seconds = 5 @@ -61,7 +61,21 @@ const run = async () => { console.log('insert queries:', queries) console.log('qps', queries / seconds) console.log('on my laptop best so far seen 5799 qps') - console.log() + + console.log('') + console.log('Warming up bytea test') + await client.query({ + text: 'INSERT INTO buf(name, data) VALUES ($1, $2)', + values: ['test', Buffer.allocUnsafe(104857600)], + }) + console.log('bytea warmup done') + const start = Date.now() + const results = await client.query('SELECT * FROM buf') + const time = Date.now() - start + console.log('bytea time:', time, 'ms') + console.log('bytea length:', results.rows[0].data.byteLength, 'bytes') + console.log('on my laptop best so far seen 1107ms and 104857600 bytes') + await client.end() await client.end() } From bf53552a15d1f09dbbd119b13711a13adf60b0b9 Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 3 Jul 2020 17:53:22 +0300 Subject: [PATCH 08/11] fix: major performance issues with bytea performance #2240 --- packages/pg-protocol/src/parser.ts | 128 ++++++++++++++++++++--------- packages/pg/bench.js | 20 +---- 2 files changed, 94 insertions(+), 54 deletions(-) diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index a00dabec9..1827c3d1f 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -73,10 +73,18 @@ const enum MessageCodes { export type MessageCallback = (msg: BackendMessage) => void +interface CombinedBuffer { + combinedBuffer: Buffer + combinedBufferOffset: number + combinedBufferLength: number + combinedBufferFullLength: number + reuseRemainingBuffer: boolean +} + export class Parser { - private buffer: Buffer = emptyBuffer - private bufferLength: number = 0 - private bufferOffset: number = 0 + private remainingBuffer: Buffer = emptyBuffer + private remainingBufferLength: number = 0 + private remainingBufferOffset: number = 0 private reader = new BufferReader() private mode: Mode @@ -88,65 +96,111 @@ export class Parser { } public parse(buffer: Buffer, callback: MessageCallback) { - this.mergeBuffer(buffer) - const bufferFullLength = this.bufferOffset + this.bufferLength - let offset = this.bufferOffset - while (offset + HEADER_LENGTH <= bufferFullLength) { + const { + combinedBuffer, + combinedBufferOffset, + combinedBufferLength, + reuseRemainingBuffer, + combinedBufferFullLength, + } = this.mergeBuffer(buffer) + let offset = combinedBufferOffset + while (offset + HEADER_LENGTH <= combinedBufferFullLength) { // code is 1 byte long - it identifies the message type - const code = this.buffer[offset] + const code = combinedBuffer[offset] + // length is 1 Uint32BE - it is the length of the message EXCLUDING the code - const length = this.buffer.readUInt32BE(offset + CODE_LENGTH) + const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH) + const fullMessageLength = CODE_LENGTH + length - if (fullMessageLength + offset <= bufferFullLength) { - const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer) + + if (fullMessageLength + offset <= combinedBufferFullLength) { + const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) callback(message) offset += fullMessageLength } else { break } } - if (offset === bufferFullLength) { - // No more use for the buffer - this.buffer = emptyBuffer - this.bufferLength = 0 - this.bufferOffset = 0 - } else { - // Adjust the cursors of remainingBuffer - this.bufferLength = bufferFullLength - offset - this.bufferOffset = offset - } + this.consumeBuffer({ + combinedBuffer, + combinedBufferOffset: offset, + combinedBufferLength, + reuseRemainingBuffer, + combinedBufferFullLength, + }) } - private mergeBuffer(buffer: Buffer): void { - if (this.bufferLength > 0) { - const newLength = this.bufferLength + buffer.byteLength - const newFullLength = newLength + this.bufferOffset - if (newFullLength > this.buffer.byteLength) { + private mergeBuffer(buffer: Buffer): CombinedBuffer { + let combinedBuffer = buffer + let combinedBufferLength = buffer.byteLength + let combinedBufferOffset = 0 + let reuseRemainingBuffer = this.remainingBufferLength > 0 + if (reuseRemainingBuffer) { + const newLength = this.remainingBufferLength + combinedBufferLength + const newFullLength = newLength + this.remainingBufferOffset + if (newFullLength > this.remainingBuffer.byteLength) { // We can't concat the new buffer with the remaining one let newBuffer: Buffer - if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) { + if (newLength <= this.remainingBuffer.byteLength && this.remainingBufferOffset >= this.remainingBufferLength) { // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer - newBuffer = this.buffer + newBuffer = this.remainingBuffer } else { // Allocate a new larger buffer - let newBufferLength = this.buffer.byteLength * 2 + let newBufferLength = this.remainingBuffer.byteLength * 2 while (newLength >= newBufferLength) { newBufferLength *= 2 } newBuffer = Buffer.allocUnsafe(newBufferLength) } // Move the remaining buffer to the new one - this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength) - this.buffer = newBuffer - this.bufferOffset = 0 + this.remainingBuffer.copy( + newBuffer, + 0, + this.remainingBufferOffset, + this.remainingBufferOffset + this.remainingBufferLength + ) + this.remainingBuffer = newBuffer + this.remainingBufferOffset = 0 } // Concat the new buffer with the remaining one - buffer.copy(this.buffer, this.bufferOffset + this.bufferLength) - this.bufferLength = newLength + buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength) + combinedBuffer = this.remainingBuffer + combinedBufferLength = this.remainingBufferLength = newLength + combinedBufferOffset = this.remainingBufferOffset + } + const combinedBufferFullLength = combinedBufferOffset + combinedBufferLength + return { + combinedBuffer, + combinedBufferOffset, + combinedBufferLength, + reuseRemainingBuffer, + combinedBufferFullLength, + } + } + + private consumeBuffer({ + combinedBufferOffset, + combinedBufferFullLength, + reuseRemainingBuffer, + combinedBuffer, + combinedBufferLength, + }: CombinedBuffer) { + if (combinedBufferOffset === combinedBufferFullLength) { + // No more use for the buffer + this.remainingBuffer = emptyBuffer + this.remainingBufferLength = 0 + this.remainingBufferOffset = 0 } else { - this.buffer = buffer - this.bufferOffset = 0 - this.bufferLength = buffer.byteLength + this.remainingBufferLength = combinedBufferFullLength - combinedBufferOffset + if (reuseRemainingBuffer) { + // Adjust the cursors of remainingBuffer + this.remainingBufferOffset = combinedBufferOffset + } else { + // To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer + this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2) + combinedBuffer.copy(this.remainingBuffer, 0, combinedBufferOffset) + this.remainingBufferOffset = 0 + } } } diff --git a/packages/pg/bench.js b/packages/pg/bench.js index c861c3ae6..80c07dc19 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -1,4 +1,5 @@ const pg = require('./lib') +const pool = new pg.Pool() const params = { text: @@ -16,7 +17,7 @@ const seq = { } const exec = async (client, q) => { - await client.query({ + const result = await client.query({ text: q.text, values: q.values, rowMode: 'array', @@ -39,7 +40,6 @@ const run = async () => { const client = new pg.Client() await client.connect() await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)') - await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)') await bench(client, params, 1000) console.log('warmup done') const seconds = 5 @@ -61,21 +61,7 @@ const run = async () => { console.log('insert queries:', queries) console.log('qps', queries / seconds) console.log('on my laptop best so far seen 5799 qps') - - console.log('') - console.log('Warming up bytea test') - await client.query({ - text: 'INSERT INTO buf(name, data) VALUES ($1, $2)', - values: ['test', Buffer.allocUnsafe(104857600)], - }) - console.log('bytea warmup done') - const start = Date.now() - const results = await client.query('SELECT * FROM buf') - const time = Date.now() - start - console.log('bytea time:', time, 'ms') - console.log('bytea length:', results.rows[0].data.byteLength, 'bytes') - console.log('on my laptop best so far seen 1107ms and 104857600 bytes') - + console.log() await client.end() await client.end() } From 410a6ab2486446129bced11aaf942a53e3bf30cb Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 3 Jul 2020 17:54:29 +0300 Subject: [PATCH 09/11] fix: major performance issues with bytea performance #2240 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 6ab9fa918..282ca9376 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,7 @@ "packages/*" ], "scripts": { - "test": "export PGDATABASE=data && export PGUSER=user && export PGPASSWORD=pass && yarn lint && yarn lerna exec yarn test", + "test": "yarn lint && yarn lerna exec yarn test", "build": "yarn lerna exec --scope pg-protocol yarn build", "pretest": "yarn build", "lint": "if [ -x ./node_modules/.bin/prettier ]; then eslint '*/**/*.{js,ts,tsx}'; fi;" From 69af2672ed3ece1872f60d4b4398676901971a8f Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 3 Jul 2020 17:56:13 +0300 Subject: [PATCH 10/11] fix: major performance issues with bytea performance #2240 --- packages/pg/bench.js | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/packages/pg/bench.js b/packages/pg/bench.js index 80c07dc19..1c1aa641d 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -1,5 +1,4 @@ const pg = require('./lib') -const pool = new pg.Pool() const params = { text: @@ -17,7 +16,7 @@ const seq = { } const exec = async (client, q) => { - const result = await client.query({ + await client.query({ text: q.text, values: q.values, rowMode: 'array', @@ -39,7 +38,9 @@ const bench = async (client, q, time) => { const run = async () => { const client = new pg.Client() await client.connect() + console.log('start') await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)') + await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)') await bench(client, params, 1000) console.log('warmup done') const seconds = 5 @@ -61,7 +62,21 @@ const run = async () => { console.log('insert queries:', queries) console.log('qps', queries / seconds) console.log('on my laptop best so far seen 5799 qps') - console.log() + + console.log('') + console.log('Warming up bytea test') + await client.query({ + text: 'INSERT INTO buf(name, data) VALUES ($1, $2)', + values: ['test', Buffer.allocUnsafe(104857600)], + }) + console.log('bytea warmup done') + const start = Date.now() + const results = await client.query('SELECT * FROM buf') + const time = Date.now() - start + console.log('bytea time:', time, 'ms') + console.log('bytea length:', results.rows[0].data.byteLength, 'bytes') + console.log('on my laptop best so far seen 1107ms and 104857600 bytes') + await client.end() await client.end() } From 1d3f155d4ffa5ac4200cfcc8ceb4d338790e5556 Mon Sep 17 00:00:00 2001 From: regevbr Date: Fri, 3 Jul 2020 17:57:07 +0300 Subject: [PATCH 11/11] fix: major performance issues with bytea performance #2240 --- packages/pg-protocol/src/parser.ts | 128 +++++++++-------------------- 1 file changed, 37 insertions(+), 91 deletions(-) diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index 1827c3d1f..a00dabec9 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -73,18 +73,10 @@ const enum MessageCodes { export type MessageCallback = (msg: BackendMessage) => void -interface CombinedBuffer { - combinedBuffer: Buffer - combinedBufferOffset: number - combinedBufferLength: number - combinedBufferFullLength: number - reuseRemainingBuffer: boolean -} - export class Parser { - private remainingBuffer: Buffer = emptyBuffer - private remainingBufferLength: number = 0 - private remainingBufferOffset: number = 0 + private buffer: Buffer = emptyBuffer + private bufferLength: number = 0 + private bufferOffset: number = 0 private reader = new BufferReader() private mode: Mode @@ -96,111 +88,65 @@ export class Parser { } public parse(buffer: Buffer, callback: MessageCallback) { - const { - combinedBuffer, - combinedBufferOffset, - combinedBufferLength, - reuseRemainingBuffer, - combinedBufferFullLength, - } = this.mergeBuffer(buffer) - let offset = combinedBufferOffset - while (offset + HEADER_LENGTH <= combinedBufferFullLength) { + this.mergeBuffer(buffer) + const bufferFullLength = this.bufferOffset + this.bufferLength + let offset = this.bufferOffset + while (offset + HEADER_LENGTH <= bufferFullLength) { // code is 1 byte long - it identifies the message type - const code = combinedBuffer[offset] - + const code = this.buffer[offset] // length is 1 Uint32BE - it is the length of the message EXCLUDING the code - const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH) - + const length = this.buffer.readUInt32BE(offset + CODE_LENGTH) const fullMessageLength = CODE_LENGTH + length - - if (fullMessageLength + offset <= combinedBufferFullLength) { - const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) + if (fullMessageLength + offset <= bufferFullLength) { + const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer) callback(message) offset += fullMessageLength } else { break } } - this.consumeBuffer({ - combinedBuffer, - combinedBufferOffset: offset, - combinedBufferLength, - reuseRemainingBuffer, - combinedBufferFullLength, - }) + if (offset === bufferFullLength) { + // No more use for the buffer + this.buffer = emptyBuffer + this.bufferLength = 0 + this.bufferOffset = 0 + } else { + // Adjust the cursors of remainingBuffer + this.bufferLength = bufferFullLength - offset + this.bufferOffset = offset + } } - private mergeBuffer(buffer: Buffer): CombinedBuffer { - let combinedBuffer = buffer - let combinedBufferLength = buffer.byteLength - let combinedBufferOffset = 0 - let reuseRemainingBuffer = this.remainingBufferLength > 0 - if (reuseRemainingBuffer) { - const newLength = this.remainingBufferLength + combinedBufferLength - const newFullLength = newLength + this.remainingBufferOffset - if (newFullLength > this.remainingBuffer.byteLength) { + private mergeBuffer(buffer: Buffer): void { + if (this.bufferLength > 0) { + const newLength = this.bufferLength + buffer.byteLength + const newFullLength = newLength + this.bufferOffset + if (newFullLength > this.buffer.byteLength) { // We can't concat the new buffer with the remaining one let newBuffer: Buffer - if (newLength <= this.remainingBuffer.byteLength && this.remainingBufferOffset >= this.remainingBufferLength) { + if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) { // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer - newBuffer = this.remainingBuffer + newBuffer = this.buffer } else { // Allocate a new larger buffer - let newBufferLength = this.remainingBuffer.byteLength * 2 + let newBufferLength = this.buffer.byteLength * 2 while (newLength >= newBufferLength) { newBufferLength *= 2 } newBuffer = Buffer.allocUnsafe(newBufferLength) } // Move the remaining buffer to the new one - this.remainingBuffer.copy( - newBuffer, - 0, - this.remainingBufferOffset, - this.remainingBufferOffset + this.remainingBufferLength - ) - this.remainingBuffer = newBuffer - this.remainingBufferOffset = 0 + this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength) + this.buffer = newBuffer + this.bufferOffset = 0 } // Concat the new buffer with the remaining one - buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength) - combinedBuffer = this.remainingBuffer - combinedBufferLength = this.remainingBufferLength = newLength - combinedBufferOffset = this.remainingBufferOffset - } - const combinedBufferFullLength = combinedBufferOffset + combinedBufferLength - return { - combinedBuffer, - combinedBufferOffset, - combinedBufferLength, - reuseRemainingBuffer, - combinedBufferFullLength, - } - } - - private consumeBuffer({ - combinedBufferOffset, - combinedBufferFullLength, - reuseRemainingBuffer, - combinedBuffer, - combinedBufferLength, - }: CombinedBuffer) { - if (combinedBufferOffset === combinedBufferFullLength) { - // No more use for the buffer - this.remainingBuffer = emptyBuffer - this.remainingBufferLength = 0 - this.remainingBufferOffset = 0 + buffer.copy(this.buffer, this.bufferOffset + this.bufferLength) + this.bufferLength = newLength } else { - this.remainingBufferLength = combinedBufferFullLength - combinedBufferOffset - if (reuseRemainingBuffer) { - // Adjust the cursors of remainingBuffer - this.remainingBufferOffset = combinedBufferOffset - } else { - // To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer - this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2) - combinedBuffer.copy(this.remainingBuffer, 0, combinedBufferOffset) - this.remainingBufferOffset = 0 - } + this.buffer = buffer + this.bufferOffset = 0 + this.bufferLength = buffer.byteLength } }