Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,19 @@ function logTransform2 (jsonObj) {
return (`${line.join(',')}\n`)
}

async function processLogs (bucket, filename, callback) {
async function processLogs (bucket, filename) {
console.log('Node version is: ' + process.version)
console.log('BUCKET ' + bucket)
console.log('FILENAME ' + filename)
let processedFile = filename.split('.')[0]
processedFile = processedFile.split('_')[0].concat('_', processedFile.split('_')[1])

const filePrefix = filename.split('.')[0]
const processedFile = filePrefix.split('_').slice(0, 2).join('_')

console.log('PROCESSEDFILENAME ' + processedFile)
createPipeline(bucket, filename, processedFile, callback)
return createPipeline(bucket, filename, processedFile)
}

function createPipeline (bucket, filename, processedFile, callback) {
async function createPipeline (bucket, filename, processedFile) {
const storage = new Storage({
keyFilename: 'metrics-processor-service-key.json'
})
Expand All @@ -147,47 +149,53 @@ function createPipeline (bucket, filename, processedFile, callback) {
const readBucket = storage.bucket(bucket)
const writeBucket = storage.bucket('processed-logs-nodejs')

readBucket.file(filename).download(function (err, contents) {
if (err) {
console.log('ERROR IN DOWNLOAD ', filename, err)
// callback(500)
callback()
} else {
const stringContents = contents.toString()
console.log('String length: ', stringContents.length)
const contentsArray = stringContents.split('\n')
console.log('Array Length: ', contentsArray.length)
let results = ''
for (const line of contentsArray) {
if (line.length === 0) {
continue
try {
const contents = await readBucket.file(filename).download()

const stringContents = contents.toString()
console.log(`String length: ${stringContents.length}`)
const contentsArray = stringContents.split('\n').filter(line => line.length > 0)
console.log(`Array Length: ${contentsArray.length}`)

let results = ''
for (const line of contentsArray) {
try {
const jsonparse = JSON.parse(line)
const printout = logTransform2(jsonparse)
if (printout !== undefined) {
results = results.concat(printout)
}
try {
const jsonparse = JSON.parse(line)
const printout = logTransform2(jsonparse)
if (printout !== undefined) { results = results.concat(printout) }
} catch (err) { console.log(err) }
} catch (err) {
console.log(err)
}
}

writeBucket.file(processedFile).save(results, function (err) {
if (err) {
console.log('ERROR UPLOADING: ', err)
const used = process.memoryUsage()
for (const key in used) {
console.log(`${key} ${Math.round(used[key] / 1024 / 1024 * 100) / 100} MB`)
}
callback(500)
} else {
console.log('Upload complete')
const used = process.memoryUsage()
for (const key in used) {
console.log(`${key} ${Math.round(used[key] / 1024 / 1024 * 100) / 100} MB`)
}
callback(200)
}
})
try {
await writeBucket.file(processedFile).save(results);
console.log(`Upload complete: ${processedFile}`)
return {
statusCode: 200,
message: `Upload complete: ${processedFile}`
}
} catch (err) {
console.log(`ERROR UPLOADING ${processedFile} - ${err}`)
return {
statusCode: 500,
message: `Error uploading file: ${processedFile}`
}
} finally {
const used = process.memoryUsage()
for (const key in used) {
console.log(`${key} ${Math.round(used[key] / 1024 / 1024 * 100) / 100} MB`)
}
}
})
} catch (err) {
console.log('ERROR IN DOWNLOAD ', filename, err)
return {
statusCode: 500,
message: `Error downloading file: ${filename}`
}
}
}

app.post('/', async (req, res) => {
Expand Down Expand Up @@ -215,9 +223,8 @@ app.post('/', async (req, res) => {
const bucket = req.body.message.attributes.bucketId
const filename = req.body.message.attributes.objectId
console.log('EVENT TYPE: ', eventType)
processLogs(bucket, filename, function (status) {
res.status(status).send()
})
const { statusCode, message } = await processLogs(bucket, filename)
res.status(statusCode).send(message)
})

const port = process.env.PORT || 8080
Expand Down