class Mongo::Protocol::Msg
MongoDB Wire protocol Msg message (OP_MSG), a bi-directional wire protocol opcode.
OP_MSG is only available in MongoDB 3.6 (maxWireVersion >= 6) and later.
@api private
@since 2.5.0
Constants
- DATABASE_IDENTIFIER
The identifier for the database name to execute the command on.
@since 2.5.0
- FLAGS
Available flags for a OP_MSG message.
- OP_CODE
The operation code required to specify a OP_MSG message. @return [ Fixnum ] the operation code.
@since 2.5.0
Public Class Methods
Creates a new OP_MSG protocol message
@example Create a OP_MSG wire protocol message
Msg.new([:more_to_come], {}, { ismaster: 1 }, { type: 1, payload: { identifier: 'documents', sequence: [..] } })
@param [ Array<Symbol> ] flags The flag bits. Current supported values are :more_to_come and :checksum_present. @param [ Hash ] options The options. There are currently no supported
options, this is a placeholder for the future.
@param [ BSON::Document, Hash ] #global_args The global arguments,
becomes a section of payload type 0
@param [ BSON::Document, Hash ] sections Zero or more sections, in the format
{ type: 1, payload: { identifier: <String>, sequence: <Array<BSON::Document, Hash>> } } or { type: 0, payload: <BSON::Document, Hash> }
@option options [ true, false ] validating_keys Whether keys should be validated.
@api private
@since 2.5.0
# File lib/mongo/protocol/msg.rb, line 55 def initialize(flags, options, global_args, *sections) @flags = flags || [ :none ] @options = options @global_args = global_args @sections = [ { type: 0, payload: global_args } ] + sections @request_id = nil super end
Public Instance Methods
Compress this message.
@param [ String, Symbol ] compressor The compressor to use. @param [ Integer ] zlib_compression_level The zlib compression level to use.
@return [ Compressed, self ] A Protocol::Compressed message or self, depending on whether
this message can be compressed.
@since 2.5.0
# File lib/mongo/protocol/msg.rb, line 117 def compress!(compressor, zlib_compression_level = nil) if compressor && compression_allowed?(command.keys.first) Compressed.new(self, compressor, zlib_compression_level) else self end end
Return the event payload for monitoring.
@example Return the event payload.
message.payload
@return [ BSON::Document ] The event payload.
@since 2.5.0
# File lib/mongo/protocol/msg.rb, line 84 def payload BSON::Document.new( command_name: command.keys.first, database_name: global_args[DATABASE_IDENTIFIER], command: command, request_id: request_id, reply: sections[0] ) end
Whether the message expects a reply from the database.
@example Does the message require a reply?
message.replyable?
@return [ true, false ] If the message expects a reply.
@since 2.5.0
# File lib/mongo/protocol/msg.rb, line 72 def replyable? @replyable ||= !flags.include?(:more_to_come) end
Serializes message into bytes that can be sent on the wire.
@param [ BSON::ByteBuffer ] buffer where the message should be inserted. @param [ Integer ] max_bson_size The maximum bson object size.
@return [ BSON::ByteBuffer ] buffer containing the serialized message.
@since 2.5.0
# File lib/mongo/protocol/msg.rb, line 102 def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil) super add_check_sum(buffer) buffer end
Private Instance Methods
# File lib/mongo/protocol/msg.rb, line 140 def add_check_sum(buffer) if flags.include?(:checksum_present) #buffer.put_int32(checksum) end end
# File lib/mongo/protocol/msg.rb, line 127 def command @command ||= global_args.dup.tap do |cmd| cmd.delete(DATABASE_IDENTIFIER) sections.each do |section| if section[:type] == 1 identifier = section[:payload][:identifier] cmd[identifier] ||= [] cmd[identifier] += section[:payload][:sequence] end end end end
# File lib/mongo/protocol/msg.rb, line 146 def global_args @global_args ||= (sections[0] || {}) end