"use strict";
|
|
|
|
var Long = require('mongodb-core').BSON.Long,
|
|
Timestamp = require('mongodb-core').BSON.Timestamp;
|
|
|
|
// Error codes
|
|
var UNKNOWN_ERROR = 8;
|
|
var INVALID_BSON_ERROR = 22;
|
|
var WRITE_CONCERN_ERROR = 64;
|
|
var MULTIPLE_ERROR = 65;
|
|
|
|
// Insert types
|
|
var INSERT = 1;
|
|
var UPDATE = 2;
|
|
var REMOVE = 3
|
|
|
|
|
|
// Get write concern
|
|
var writeConcern = function(target, col, options) {
|
|
var writeConcern = {};
|
|
|
|
// Collection level write concern
|
|
if(col.writeConcern && col.writeConcern.w != null) writeConcern.w = col.writeConcern.w;
|
|
if(col.writeConcern && col.writeConcern.j != null) writeConcern.j = col.writeConcern.j;
|
|
if(col.writeConcern && col.writeConcern.fsync != null) writeConcern.fsync = col.writeConcern.fsync;
|
|
if(col.writeConcern && col.writeConcern.wtimeout != null) writeConcern.wtimeout = col.writeConcern.wtimeout;
|
|
|
|
// Options level write concern
|
|
if(options && options.w != null) writeConcern.w = options.w;
|
|
if(options && options.wtimeout != null) writeConcern.wtimeout = options.wtimeout;
|
|
if(options && options.j != null) writeConcern.j = options.j;
|
|
if(options && options.fsync != null) writeConcern.fsync = options.fsync;
|
|
|
|
// Return write concern
|
|
return writeConcern;
|
|
}
|
|
|
|
/**
|
|
* Helper function to define properties
|
|
* @ignore
|
|
*/
|
|
var defineReadOnlyProperty = function(self, name, value) {
|
|
Object.defineProperty(self, name, {
|
|
enumerable: true
|
|
, get: function() {
|
|
return value;
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Keeps the state of a unordered batch so we can rewrite the results
|
|
* correctly after command execution
|
|
* @ignore
|
|
*/
|
|
var Batch = function(batchType, originalZeroIndex) {
|
|
this.originalZeroIndex = originalZeroIndex;
|
|
this.currentIndex = 0;
|
|
this.originalIndexes = [];
|
|
this.batchType = batchType;
|
|
this.operations = [];
|
|
this.size = 0;
|
|
this.sizeBytes = 0;
|
|
}
|
|
|
|
/**
|
|
* Wraps a legacy operation so we can correctly rewrite it's error
|
|
* @ignore
|
|
*/
|
|
var LegacyOp = function(batchType, operation, index) {
|
|
this.batchType = batchType;
|
|
this.index = index;
|
|
this.operation = operation;
|
|
}
|
|
|
|
/**
|
|
* Create a new BulkWriteResult instance (INTERNAL TYPE, do not instantiate directly)
|
|
*
|
|
* @class
|
|
* @property {boolean} ok Did bulk operation correctly execute
|
|
* @property {number} nInserted number of inserted documents
|
|
* @property {number} nUpdated number of documents updated logically
|
|
* @property {number} nUpserted Number of upserted documents
|
|
* @property {number} nModified Number of documents updated physically on disk
|
|
* @property {number} nRemoved Number of removed documents
|
|
* @return {BulkWriteResult} a BulkWriteResult instance
|
|
*/
|
|
var BulkWriteResult = function(bulkResult) {
|
|
defineReadOnlyProperty(this, "ok", bulkResult.ok);
|
|
defineReadOnlyProperty(this, "nInserted", bulkResult.nInserted);
|
|
defineReadOnlyProperty(this, "nUpserted", bulkResult.nUpserted);
|
|
defineReadOnlyProperty(this, "nMatched", bulkResult.nMatched);
|
|
defineReadOnlyProperty(this, "nModified", bulkResult.nModified);
|
|
defineReadOnlyProperty(this, "nRemoved", bulkResult.nRemoved);
|
|
|
|
/**
|
|
* Return an array of inserted ids
|
|
*
|
|
* @return {object[]}
|
|
*/
|
|
this.getInsertedIds = function() {
|
|
return bulkResult.insertedIds;
|
|
}
|
|
|
|
/**
|
|
* Return an array of upserted ids
|
|
*
|
|
* @return {object[]}
|
|
*/
|
|
this.getUpsertedIds = function() {
|
|
return bulkResult.upserted;
|
|
}
|
|
|
|
/**
|
|
* Return the upserted id at position x
|
|
*
|
|
* @param {number} index the number of the upserted id to return, returns undefined if no result for passed in index
|
|
* @return {object}
|
|
*/
|
|
this.getUpsertedIdAt = function(index) {
|
|
return bulkResult.upserted[index];
|
|
}
|
|
|
|
/**
|
|
* Return raw internal result
|
|
*
|
|
* @return {object}
|
|
*/
|
|
this.getRawResponse = function() {
|
|
return bulkResult;
|
|
}
|
|
|
|
/**
|
|
* Returns true if the bulk operation contains a write error
|
|
*
|
|
* @return {boolean}
|
|
*/
|
|
this.hasWriteErrors = function() {
|
|
return bulkResult.writeErrors.length > 0;
|
|
}
|
|
|
|
/**
|
|
* Returns the number of write errors off the bulk operation
|
|
*
|
|
* @return {number}
|
|
*/
|
|
this.getWriteErrorCount = function() {
|
|
return bulkResult.writeErrors.length;
|
|
}
|
|
|
|
/**
|
|
* Returns a specific write error object
|
|
*
|
|
* @param {number} index of the write error to return, returns null if there is no result for passed in index
|
|
* @return {WriteError}
|
|
*/
|
|
this.getWriteErrorAt = function(index) {
|
|
if(index < bulkResult.writeErrors.length) {
|
|
return bulkResult.writeErrors[index];
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Retrieve all write errors
|
|
*
|
|
* @return {object[]}
|
|
*/
|
|
this.getWriteErrors = function() {
|
|
return bulkResult.writeErrors;
|
|
}
|
|
|
|
/**
|
|
* Retrieve lastOp if available
|
|
*
|
|
* @return {object}
|
|
*/
|
|
this.getLastOp = function() {
|
|
return bulkResult.lastOp;
|
|
}
|
|
|
|
/**
|
|
* Retrieve the write concern error if any
|
|
*
|
|
* @return {WriteConcernError}
|
|
*/
|
|
this.getWriteConcernError = function() {
|
|
if(bulkResult.writeConcernErrors.length == 0) {
|
|
return null;
|
|
} else if(bulkResult.writeConcernErrors.length == 1) {
|
|
// Return the error
|
|
return bulkResult.writeConcernErrors[0];
|
|
} else {
|
|
|
|
// Combine the errors
|
|
var errmsg = "";
|
|
for(var i = 0; i < bulkResult.writeConcernErrors.length; i++) {
|
|
var err = bulkResult.writeConcernErrors[i];
|
|
errmsg = errmsg + err.errmsg;
|
|
|
|
// TODO: Something better
|
|
if(i == 0) errmsg = errmsg + " and ";
|
|
}
|
|
|
|
return new WriteConcernError({ errmsg : errmsg, code : WRITE_CONCERN_ERROR });
|
|
}
|
|
}
|
|
|
|
this.toJSON = function() {
|
|
return bulkResult;
|
|
}
|
|
|
|
this.toString = function() {
|
|
return "BulkWriteResult(" + this.toJSON(bulkResult) + ")";
|
|
}
|
|
|
|
this.isOk = function() {
|
|
return bulkResult.ok == 1;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a new WriteConcernError instance (INTERNAL TYPE, do not instantiate directly)
|
|
*
|
|
* @class
|
|
* @property {number} code Write concern error code.
|
|
* @property {string} errmsg Write concern error message.
|
|
* @return {WriteConcernError} a WriteConcernError instance
|
|
*/
|
|
var WriteConcernError = function(err) {
|
|
if(!(this instanceof WriteConcernError)) return new WriteConcernError(err);
|
|
|
|
// Define properties
|
|
defineReadOnlyProperty(this, "code", err.code);
|
|
defineReadOnlyProperty(this, "errmsg", err.errmsg);
|
|
|
|
this.toJSON = function() {
|
|
return {code: err.code, errmsg: err.errmsg};
|
|
}
|
|
|
|
this.toString = function() {
|
|
return "WriteConcernError(" + err.errmsg + ")";
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a new WriteError instance (INTERNAL TYPE, do not instantiate directly)
|
|
*
|
|
* @class
|
|
* @property {number} code Write concern error code.
|
|
* @property {number} index Write concern error original bulk operation index.
|
|
* @property {string} errmsg Write concern error message.
|
|
* @return {WriteConcernError} a WriteConcernError instance
|
|
*/
|
|
var WriteError = function(err) {
|
|
if(!(this instanceof WriteError)) return new WriteError(err);
|
|
|
|
// Define properties
|
|
defineReadOnlyProperty(this, "code", err.code);
|
|
defineReadOnlyProperty(this, "index", err.index);
|
|
defineReadOnlyProperty(this, "errmsg", err.errmsg);
|
|
|
|
//
|
|
// Define access methods
|
|
this.getOperation = function() {
|
|
return err.op;
|
|
}
|
|
|
|
this.toJSON = function() {
|
|
return {code: err.code, index: err.index, errmsg: err.errmsg, op: err.op};
|
|
}
|
|
|
|
this.toString = function() {
|
|
return "WriteError(" + JSON.stringify(this.toJSON()) + ")";
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Merges results into shared data structure
|
|
* @ignore
|
|
*/
|
|
var mergeBatchResults = function(ordered, batch, bulkResult, err, result) {
|
|
// If we have an error set the result to be the err object
|
|
if(err) {
|
|
result = err;
|
|
} else if(result && result.result) {
|
|
result = result.result;
|
|
} else if(result == null) {
|
|
return;
|
|
}
|
|
|
|
// Do we have a top level error stop processing and return
|
|
if(result.ok == 0 && bulkResult.ok == 1) {
|
|
bulkResult.ok = 0;
|
|
|
|
var writeError = {
|
|
index: 0
|
|
, code: result.code || 0
|
|
, errmsg: result.message
|
|
, op: batch.operations[0]
|
|
};
|
|
|
|
bulkResult.writeErrors.push(new WriteError(writeError));
|
|
return;
|
|
} else if(result.ok == 0 && bulkResult.ok == 0) {
|
|
return;
|
|
}
|
|
|
|
// Deal with opTime if available
|
|
if(result.opTime || result.lastOp) {
|
|
var opTime = result.lastOp || result.opTime;
|
|
var lastOpTS = null;
|
|
var lastOpT = null;
|
|
|
|
// We have a time stamp
|
|
if(opTime && opTime._bsontype == 'Timestamp') {
|
|
if(bulkResult.lastOp == null) {
|
|
bulkResult.lastOp = opTime;
|
|
} else if(opTime.greaterThan(bulkResult.lastOp)) {
|
|
bulkResult.lastOp = opTime;
|
|
}
|
|
} else {
|
|
// Existing TS
|
|
if(bulkResult.lastOp) {
|
|
lastOpTS = typeof bulkResult.lastOp.ts == 'number'
|
|
? Long.fromNumber(bulkResult.lastOp.ts) : bulkResult.lastOp.ts;
|
|
lastOpT = typeof bulkResult.lastOp.t == 'number'
|
|
? Long.fromNumber(bulkResult.lastOp.t) : bulkResult.lastOp.t;
|
|
}
|
|
|
|
// Current OpTime TS
|
|
var opTimeTS = typeof opTime.ts == 'number'
|
|
? Long.fromNumber(opTime.ts) : opTime.ts;
|
|
var opTimeT = typeof opTime.t == 'number'
|
|
? Long.fromNumber(opTime.t) : opTime.t;
|
|
|
|
// Compare the opTime's
|
|
if(bulkResult.lastOp == null) {
|
|
bulkResult.lastOp = opTime;
|
|
} else if(opTimeTS.greaterThan(lastOpTS)) {
|
|
bulkResult.lastOp = opTime;
|
|
} else if(opTimeTS.equals(lastOpTS)) {
|
|
if(opTimeT.greaterThan(lastOpT)) {
|
|
bulkResult.lastOp = opTime;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If we have an insert Batch type
|
|
if(batch.batchType == INSERT && result.n) {
|
|
bulkResult.nInserted = bulkResult.nInserted + result.n;
|
|
}
|
|
|
|
// If we have an insert Batch type
|
|
if(batch.batchType == REMOVE && result.n) {
|
|
bulkResult.nRemoved = bulkResult.nRemoved + result.n;
|
|
}
|
|
|
|
var nUpserted = 0;
|
|
|
|
// We have an array of upserted values, we need to rewrite the indexes
|
|
if(Array.isArray(result.upserted)) {
|
|
nUpserted = result.upserted.length;
|
|
|
|
for(var i = 0; i < result.upserted.length; i++) {
|
|
bulkResult.upserted.push({
|
|
index: result.upserted[i].index + batch.originalZeroIndex
|
|
, _id: result.upserted[i]._id
|
|
});
|
|
}
|
|
} else if(result.upserted) {
|
|
|
|
nUpserted = 1;
|
|
|
|
bulkResult.upserted.push({
|
|
index: batch.originalZeroIndex
|
|
, _id: result.upserted
|
|
});
|
|
}
|
|
|
|
// If we have an update Batch type
|
|
if(batch.batchType == UPDATE && result.n) {
|
|
var nModified = result.nModified;
|
|
bulkResult.nUpserted = bulkResult.nUpserted + nUpserted;
|
|
bulkResult.nMatched = bulkResult.nMatched + (result.n - nUpserted);
|
|
|
|
if(typeof nModified == 'number') {
|
|
bulkResult.nModified = bulkResult.nModified + nModified;
|
|
} else {
|
|
bulkResult.nModified = null;
|
|
}
|
|
}
|
|
|
|
if(Array.isArray(result.writeErrors)) {
|
|
for(i = 0; i < result.writeErrors.length; i++) {
|
|
|
|
writeError = {
|
|
index: batch.originalZeroIndex + result.writeErrors[i].index
|
|
, code: result.writeErrors[i].code
|
|
, errmsg: result.writeErrors[i].errmsg
|
|
, op: batch.operations[result.writeErrors[i].index]
|
|
};
|
|
|
|
bulkResult.writeErrors.push(new WriteError(writeError));
|
|
}
|
|
}
|
|
|
|
if(result.writeConcernError) {
|
|
bulkResult.writeConcernErrors.push(new WriteConcernError(result.writeConcernError));
|
|
}
|
|
}
|
|
|
|
//
|
|
// Clone the options
|
|
var cloneOptions = function(options) {
|
|
var clone = {};
|
|
var keys = Object.keys(options);
|
|
for(var i = 0; i < keys.length; i++) {
|
|
clone[keys[i]] = options[keys[i]];
|
|
}
|
|
|
|
return clone;
|
|
}
|
|
|
|
// Exports symbols
|
|
exports.BulkWriteResult = BulkWriteResult;
|
|
exports.WriteError = WriteError;
|
|
exports.Batch = Batch;
|
|
exports.LegacyOp = LegacyOp;
|
|
exports.mergeBatchResults = mergeBatchResults;
|
|
exports.cloneOptions = cloneOptions;
|
|
exports.writeConcern = writeConcern;
|
|
exports.INVALID_BSON_ERROR = INVALID_BSON_ERROR;
|
|
exports.WRITE_CONCERN_ERROR = WRITE_CONCERN_ERROR;
|
|
exports.MULTIPLE_ERROR = MULTIPLE_ERROR;
|
|
exports.UNKNOWN_ERROR = UNKNOWN_ERROR;
|
|
exports.INSERT = INSERT;
|
|
exports.UPDATE = UPDATE;
|
|
exports.REMOVE = REMOVE;
|