var db = require('./../lib/db'); var fs = require('fs'); var _ = require('underscore'); var async = require('async'); var convertDate = require('./../plugins/convertDate'); var findOneOrCreate = require('mongoose-find-one-or-create'); var agent_names_path = __dirname + '/data/agentsnames.json'; var shipper_names_path = __dirname + '/data/snippetsnames.json'; var pipe_names_path = __dirname + '/data/pipenames.json'; var agent_clean_names, shipper_clean_names, pipe_clean_names; var emptyCleanNames = { pipelines: [], shippers: [], agents: [], points: [] } var iter = 0; fs.readFile(agent_names_path, function(err, data) { // if (err) throw err; if (err) console.log ('readFile agent_names_path', err); agent_clean_names = _.object(JSON.parse(data)); }); fs.readFile(shipper_names_path, function(err, data) { // if (err) throw err; if (err) console.log ('readFile shipper_names_path', err); shipper_clean_names = _.object(JSON.parse(data)); }); fs.readFile(pipe_names_path, function(err, data) { // if (err) throw err; if (err) console.log ('readFile pipe_names_path', err); pipe_clean_names = _.object(JSON.parse(data)); }); var _ref = db.models, Agent = _ref.Agent, Footnote = _ref.Footnote, Pipeline = _ref.Pipeline, Shipper = _ref.Shipper, Point = _ref.Point; var path = __dirname + '/Documents'; var fields = { 'H': ['tab_key', 'pipeline_name', 'pipe_id', 'report_date', 'original_revised_indicator', 'quarter', 'unit_of_measurement_trans', 'unit_of_measurement_store', 'contact', 'pipe_fnote_id'], 'P': ['tab_key', 'point_type', 'point_name', 'point_id_code_qualifier', 'point_id_code_qualifier', 'zone_name', 'point_transport_mdq', 'point_storage_msq', 'point_fnote_id'], 'D': ['tab_key', 'shipper_name', 'shipper_id', 'shipper_affiliate_ind', 'rate_schedule', 'contract_number', 'start_date', 'end_date', 'next_expiration_days', 'negotiated_rate_ind', 'contract_transport_mdq', 'contract_transport_msq', 'shipperfnote_id'], 'A': ['tab_key', 'agent_name', 'agent_affiliate_ind', 'agent_fnote_id'], 'F': ['tab_key', 'footnote_id', 'footnote_text'] } var footnote_fields = { 'H' : 'pipe_fnote', 'P' : 'point_fnote', 'D' : 'shipper_fnote', 'A' : 'agent_fnote' } var find_or_create = function(task, callback) { var filter = {}, search_model; if (task.tab_key == 'P') { filter.point_id_code_qualifier = task.model.point_id_code_qualifier; search_model = Point } else if (task.tab_key == 'A') { if (task.model.agent_name_clean) { filter.agent_name_clean = task.model.agent_name_clean } else { filter.agent_name = task.model.agent_name } search_model = Agent } else if (task.tab_key == 'D') { if (task.model.shipper_name_clean) { filter.shipper_name_clean = task.model.shipper_name_clean } else { filter.shipper_name = task.model.shipper_name } filter.start_date = task.model.start_date filter.end_date = task.model.end_date filter.contract_transport_mdq = task.model.contract_transport_mdq filter.contract_storage_msq = task.model.contract_storage_msq search_model = Shipper } else if (task.tab_key == 'F') { filter.footnote_text = task.model.footnote_text search_model = Footnote } search_model.findOneOrCreate(filter, task.model, function(err, model) { callback(err, model && model['_id']); }); }; var updateAll = function (mdls, fCb) { async.waterfall([ function(innerCb){ async.eachSeries(mdls.points, function(pnt, cb_p){ Point.findById(pnt.id, function (err, modelPoint) { if (err) console.log('ERROR!', err); modelPoint.parent_pipeline = pnt.parent_pipeline_id; modelPoint.parent_shipper = pnt.parent_shipper_id; modelPoint.save(function () { cb_p(null); }); }); }, function(err){ innerCb(null) }); }, function(innerCb){ async.eachSeries(mdls.agents, function(agnt, cb_p){ Agent.findById(agnt.id, function (err, modelAgent) { if (err) console.log('ERROR!', err); modelAgent.parent_pipeline = agnt.parent_pipeline_id; modelAgent.parent_shipper = agnt.parent_shipper_id; modelAgent.save(function () { cb_p(null); }); }); }, function(err){ innerCb(null) }); }, function(innerCb){ async.eachSeries(mdls.shippers, function(shppr, cb_p){ Shipper.findById(shppr.id, function (err, modelShipper) { if (err) console.log('ERROR!', err); modelShipper.parent_pipeline_id = shppr.parent_pipeline_id; modelShipper.save(function () { cb_p(null) }); }); }, function(err){ innerCb(null) }); } ],function(err){ fCb(); }) }; module.exports = { savePoints: function(cb) { var self = this; var arr = self.points.map(function(point, i) { var task = {}; task.tab_key = point.tab_key task.model = point return function(cb) { find_or_create(task, function(err, id) { if (!err) self.points[i]['id'] = id else console.log('ERROR: ', err); cb(err, id) }) } }) async.series(arr, function(err, results) { cb(err, results) }) }, saveFootnotes: function (cb) { var self = this; var arr = self.footnotes.map(function(footnote, i) { var task = {} task.tab_key = footnote.tab_key task.model = footnote return function(cb) { find_or_create(task, function(err, id) { self.footnotes[i]['id'] = id cb(err, id) }) } }) async.series(arr, function(err, results) { cb(err, results) }) }, saveAgents: function(cb) { var self = this; var arr = self.agents.map(function(agent, i) { var task = {}; var name = agent.agent_name.replace(new RegExp('"', "g"), '') agent.agent_name_clean = agent_clean_names[name] || '' if(!agent_clean_names[name]) { // emptyCleanNames.agents.push(agent.agent_name) // fs.appendFile(__dirname + "/data/agent_names.json", agent.agent_name + '\n', function(err) { // if(err) console.log(err); // }); } task.tab_key = agent.tab_key task.model = agent return function(cb) { find_or_create(task, function(err, id) { if(!err) self.agents[i]['id'] = id else console.log('ERROR:',err) cb(err, id) }) } }) async.series(arr, function(err, results) { cb(err, results) }) }, insertFootnotes: function(cb) { var self = this; self.footnotes.forEach(function (footnote) { var num = +footnote.footnote_id -1 if (!num || num<0) return var obj = self.data_to_parse[num] obj[footnote_fields[obj.tab_key]] = footnote.id }) return cb() }, saveShippers: function(cb) { var self = this; var arr = self.shippers.map(function(shipper, i ) { var task = {}; // var name = shipper.shipper_name.replace(new RegExp('"', "g"), '') var name = shipper.shipper_name shipper.shipper_name_clean = shipper_clean_names[name] || '' if(!shipper_clean_names[name]) { // emptyCleanNames.shippers.push(shipper.shipper_name) // fs.appendFile(__dirname + "/data/shipper_names.json", shipper.shipper_name + '\n', function(err) { // if(err) console.log(err); // }); } var points_id = self.points.filter(function(d) { return d.d_number == i }).map(function(d) {return d.id}) var agents_id = self.agents.filter(function(d) { return d.d_number == i }).map(function(d) {return d.id}) shipper.point_count = points_id.length shipper.points = points_id shipper.agent_count = agents_id.length shipper.agents = agents_id shipper.end_date = convertDate(shipper.end_date); shipper.start_date = convertDate(shipper.start_date); task.tab_key = shipper.tab_key task.model = shipper return function(cb) { find_or_create(task, function(err, id) { if(!err){ self.shippers[i]['id'] = id var points = self.points.filter(function (d) { return self.shippers[i].points.indexOf(d.id)!=-1 }) points.forEach(function(p) { p.parent_shipper_id = self.shippers[i].id }) var agents = self.agents.filter(function (d) { return self.shippers[i].agents.indexOf(d.id)!=-1 }) agents.forEach(function(a) { a.parent_shipper_id = self.shippers[i].id }) cb(err, id) } else { console.log('ERROR: ', err) cb(err, null); } }) } }) async.series(arr, function(err, results) { cb(err, results) }) }, savePipeline: function (cb) { var self = this; var pipeline = self.pipeline // var name = pipeline.pipeline_name.replace(new RegExp('"', "g"), '') var name = pipeline.pipeline_name pipeline.pipeline_name_clean = pipe_clean_names[name] || '' if(!pipe_clean_names[name]) { // emptyCleanNames.pipelines.push(pipeline.pipeline_name) // fs.appendFile(__dirname + "/data/pipeline_names.json", pipeline.pipeline_name + '\n', function(err) { // if(err) console.log(err); // }); } var shippers_id = self.shippers.map(function(d) { return d.id }) pipeline.shipper_count = shippers_id.length; pipeline.shippers = shippers_id pipeline.quarter = convertDate(pipeline.quarter); pipeline.report_date = convertDate(pipeline.report_date); Pipeline.findOneOrCreate({'pipe_id' : pipeline.pipe_id, 'quarter': pipeline.quarter}, pipeline, function(err, model) { if(!err) { self.points.forEach(function(p){ p.parent_pipeline_id = model['_id']; p.parent_pipeline = model['pipe_id'] }) self.agents.forEach(function(a){ a.parent_pipeline_id = model['_id']; a.parent_pipeline = model['pipe_id'] }) self.shippers.forEach(function(s){ s.parent_pipeline_id = model['_id']; s.parent_pipeline = model['pipe_id'] }) cb(err, model['_id']); } else { console.log('ERROR:', err) cb(err, null); } }); }, workWithFile: function (file, cb) { console.log('fileStart', file) var self = this; var init_d_number = -1; fs.readFile(file, function(err, data) { if (err) console.log('ERR', err)//throw err; var data_to_parse = data.toString().split('\r\n') self.data_to_parse = data_to_parse.map(function(str) { str = str.split('\t') var key = str[0].replace(/(^\s+|\s+$)/g,''); var arr_fields = fields[key] || [] var result = _.object(arr_fields, str) if (key == 'D') { init_d_number++; } else if (key != 'H' && key != 'F') { result.d_number = init_d_number } return result }) async.waterfall([ function(innerCb){ self.footnotes = self.data_to_parse.filter(function(d) { return d['tab_key'] == "F" }) innerCb(null) }, function(innerCb){ self.points = self.data_to_parse.filter(function(d) { return d['tab_key'] == "P" }) innerCb(null) }, function(innerCb){ self.agents = self.data_to_parse.filter(function(d) { return d['tab_key'] == "A" }) innerCb(null) }, function(innerCb){ self.shippers = self.data_to_parse.filter(function(d) { return d['tab_key'] == "D" }) innerCb(null) }, function(innerCb){ self.pipeline = self.data_to_parse[0] innerCb(null) }],function(err){ self.saveFootnotes(function(err, foot_ids) { self.insertFootnotes(function() { self.savePoints(function(err, points_ids) { self.saveAgents(function(err, agents_ids) { self.saveShippers(function(err, shipper_ids) { self.savePipeline(function (err, pipe_id) { updateAll({points: self.points, agents: self.agents, shippers: self.shippers}, function () { fs.unlink(file, function(err) { if (err) console.log('some happen with file', file); else { console.log('fileEnd', file, iter++) return cb && cb() } }) }) }) }) }) }) }) }) }) }) }, cleanNames: function () { var dirtAg = Agent.find({"agent_name_clean": ""}, function (err, agents) { var arr = agents.map(function (agent, i) { return function(cb) { var name = agent.agent_name.replace(new RegExp('"', "g"), '') agent.agent_name_clean = agent_clean_names[name] || '' if (agent.agent_name_clean.length>0) { agent.save(function(err) { console.log('save', i) return cb(err, i) }) } else { console.log(name) return cb(null, i) } } }) async.series(arr, function(err, results) { console.log(err) }) }) }, runParse: function(cb) { var self = this; console.log('start parsing') try { fs.readdir(path, function(err, files) { var arr = files.map(function (fileName) { var thisFile = path + '/' + fileName return function (innerCb) { if (fileName == '.DS_Store') return innerCb(); self.workWithFile(thisFile, function () { return innerCb() }) } }); async.series(arr, function(err, results) { console.log("SAVED."); cb && cb('OK'); }); }) } catch (err){ console.log('ERROR: ', err) } } }