mirror of
https://github.com/JonasunderscoreJones/jonas_jones-api.git
synced 2025-10-24 01:29:19 +02:00
some progress
This commit is contained in:
parent
aea93a5527
commit
e3c15bd288
1388 changed files with 306946 additions and 68323 deletions
156
node_modules/mongodb/lib/cmap/message_stream.js
generated
vendored
Normal file
156
node_modules/mongodb/lib/cmap/message_stream.js
generated
vendored
Normal file
|
|
@ -0,0 +1,156 @@
|
|||
"use strict";
|
||||
Object.defineProperty(exports, "__esModule", { value: true });
|
||||
exports.MessageStream = void 0;
|
||||
const stream_1 = require("stream");
|
||||
const error_1 = require("../error");
|
||||
const utils_1 = require("../utils");
|
||||
const commands_1 = require("./commands");
|
||||
const compression_1 = require("./wire_protocol/compression");
|
||||
const constants_1 = require("./wire_protocol/constants");
|
||||
const MESSAGE_HEADER_SIZE = 16;
|
||||
const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID
|
||||
const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
|
||||
/** @internal */
|
||||
const kBuffer = Symbol('buffer');
|
||||
/**
|
||||
* A duplex stream that is capable of reading and writing raw wire protocol messages, with
|
||||
* support for optional compression
|
||||
* @internal
|
||||
*/
|
||||
class MessageStream extends stream_1.Duplex {
|
||||
constructor(options = {}) {
|
||||
super(options);
|
||||
/** @internal */
|
||||
this.isMonitoringConnection = false;
|
||||
this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;
|
||||
this[kBuffer] = new utils_1.BufferPool();
|
||||
}
|
||||
get buffer() {
|
||||
return this[kBuffer];
|
||||
}
|
||||
_write(chunk, _, callback) {
|
||||
this[kBuffer].append(chunk);
|
||||
processIncomingData(this, callback);
|
||||
}
|
||||
_read( /* size */) {
|
||||
// NOTE: This implementation is empty because we explicitly push data to be read
|
||||
// when `writeMessage` is called.
|
||||
return;
|
||||
}
|
||||
writeCommand(command, operationDescription) {
|
||||
const agreedCompressor = operationDescription.agreedCompressor ?? 'none';
|
||||
if (agreedCompressor === 'none' || !canCompress(command)) {
|
||||
const data = command.toBin();
|
||||
this.push(Array.isArray(data) ? Buffer.concat(data) : data);
|
||||
return;
|
||||
}
|
||||
// otherwise, compress the message
|
||||
const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());
|
||||
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
|
||||
// Extract information needed for OP_COMPRESSED from the uncompressed message
|
||||
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
|
||||
const options = {
|
||||
agreedCompressor,
|
||||
zlibCompressionLevel: operationDescription.zlibCompressionLevel ?? 0
|
||||
};
|
||||
// Compress the message body
|
||||
(0, compression_1.compress)(options, messageToBeCompressed).then(compressedMessage => {
|
||||
// Create the msgHeader of OP_COMPRESSED
|
||||
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
|
||||
msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, 0); // messageLength
|
||||
msgHeader.writeInt32LE(command.requestId, 4); // requestID
|
||||
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
|
||||
msgHeader.writeInt32LE(constants_1.OP_COMPRESSED, 12); // opCode
|
||||
// Create the compression details of OP_COMPRESSED
|
||||
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
|
||||
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
|
||||
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
|
||||
compressionDetails.writeUInt8(compression_1.Compressor[agreedCompressor], 8); // compressorID
|
||||
this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
|
||||
}, error => {
|
||||
operationDescription.cb(error);
|
||||
});
|
||||
}
|
||||
}
|
||||
exports.MessageStream = MessageStream;
|
||||
// Return whether a command contains an uncompressible command term
|
||||
// Will return true if command contains no uncompressible command terms
|
||||
function canCompress(command) {
|
||||
const commandDoc = command instanceof commands_1.Msg ? command.command : command.query;
|
||||
const commandName = Object.keys(commandDoc)[0];
|
||||
return !compression_1.uncompressibleCommands.has(commandName);
|
||||
}
|
||||
function processIncomingData(stream, callback) {
|
||||
const buffer = stream[kBuffer];
|
||||
const sizeOfMessage = buffer.getInt32();
|
||||
if (sizeOfMessage == null) {
|
||||
return callback();
|
||||
}
|
||||
if (sizeOfMessage < 0) {
|
||||
return callback(new error_1.MongoParseError(`Invalid message size: ${sizeOfMessage}`));
|
||||
}
|
||||
if (sizeOfMessage > stream.maxBsonMessageSize) {
|
||||
return callback(new error_1.MongoParseError(`Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`));
|
||||
}
|
||||
if (sizeOfMessage > buffer.length) {
|
||||
return callback();
|
||||
}
|
||||
const message = buffer.read(sizeOfMessage);
|
||||
const messageHeader = {
|
||||
length: message.readInt32LE(0),
|
||||
requestId: message.readInt32LE(4),
|
||||
responseTo: message.readInt32LE(8),
|
||||
opCode: message.readInt32LE(12)
|
||||
};
|
||||
const monitorHasAnotherHello = () => {
|
||||
if (stream.isMonitoringConnection) {
|
||||
// Can we read the next message size?
|
||||
const sizeOfMessage = buffer.getInt32();
|
||||
if (sizeOfMessage != null && sizeOfMessage <= buffer.length) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
let ResponseType = messageHeader.opCode === constants_1.OP_MSG ? commands_1.BinMsg : commands_1.Response;
|
||||
if (messageHeader.opCode !== constants_1.OP_COMPRESSED) {
|
||||
const messageBody = message.subarray(MESSAGE_HEADER_SIZE);
|
||||
// If we are a monitoring connection message stream and
|
||||
// there is more in the buffer that can be read, skip processing since we
|
||||
// want the last hello command response that is in the buffer.
|
||||
if (monitorHasAnotherHello()) {
|
||||
return processIncomingData(stream, callback);
|
||||
}
|
||||
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
|
||||
if (buffer.length >= 4) {
|
||||
return processIncomingData(stream, callback);
|
||||
}
|
||||
return callback();
|
||||
}
|
||||
messageHeader.fromCompressed = true;
|
||||
messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
|
||||
messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
|
||||
const compressorID = message[MESSAGE_HEADER_SIZE + 8];
|
||||
const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);
|
||||
// recalculate based on wrapped opcode
|
||||
ResponseType = messageHeader.opCode === constants_1.OP_MSG ? commands_1.BinMsg : commands_1.Response;
|
||||
(0, compression_1.decompress)(compressorID, compressedBuffer).then(messageBody => {
|
||||
if (messageBody.length !== messageHeader.length) {
|
||||
return callback(new error_1.MongoDecompressionError('Message body and message header must be the same length'));
|
||||
}
|
||||
// If we are a monitoring connection message stream and
|
||||
// there is more in the buffer that can be read, skip processing since we
|
||||
// want the last hello command response that is in the buffer.
|
||||
if (monitorHasAnotherHello()) {
|
||||
return processIncomingData(stream, callback);
|
||||
}
|
||||
stream.emit('message', new ResponseType(message, messageHeader, messageBody));
|
||||
if (buffer.length >= 4) {
|
||||
return processIncomingData(stream, callback);
|
||||
}
|
||||
return callback();
|
||||
}, error => {
|
||||
return callback(error);
|
||||
});
|
||||
}
|
||||
//# sourceMappingURL=message_stream.js.map
|
||||
Loading…
Add table
Add a link
Reference in a new issue