Serverless Job 定时获取新闻热搜详细步骤和故障排查

Serverless Job 定时获取新闻热搜详细步骤和故障排查

发布时间:2024-10-13 01:13:09

让我们深入探讨这个serverless job的每个部分,以及如何进行更全面的实现和优化。

// serverless.yml
service: hot-news-fetcher

provider:
name: aws
runtime: nodejs14.x
stage: ${opt:stage, 'dev'}
region: ${opt:region, 'us-east-1'}
environment:
dynamodb_table: ${self:service}-${self:provider.stage}
iamrolestatements:
- effect: allow
action:
- dynamodb:putitem
resource: "arn:aws:dynamodb:${self:provider.region}:*:table/${self:provider.environment.dynamodb_table}"

functions:
fetchhotnews:
handler: handler.fetchhotnews
events:
- schedule: rate(1 hour)
timeout: 30 # 设置超时时间为30秒
memorysize: 256 # 设置内存大小为256mb

resources:
resources:
hotnewstable:
type: aws::dynamodb::table
properties:
tablename: ${self:provider.environment.dynamodb_table}
attributedefinitions:
- attributename: id
attributetype: s
keyschema:
- attributename: id
keytype: hash
billingmode: pay_per_request

// handler.js
const axios = require('axios');
const aws = require('aws-sdk');
const dynamodb = new aws.dynamodb.documentclient();

module.exports.fetchhotnews = async (event) => {
console.log('fetching hot news started');
try {
// 1. 从新闻api获取热搜数据
const response = await axios.get('https://api.example.com/hot-news', {
timeout: 5000 // 设置5秒超时
});
const hotnews = response.data;

console.log(`fetched ${hotnews.length} hot news items`);

// 2. 处理并存储数据
const batchsize = 25; // dynamodb 批量写入上限
for (let i = 0; i < hotnews.length; i += batchsize) {
const batch = hotnews.slice(i, i + batchsize);
const putrequests = batch.map(news => ({
putrequest: {
item: {
id: news.id,
title: news.title,
url: news.url,
timestamp: new date().toisostring(),
category: news.category || 'uncategorized',
source: news.source || 'unknown'
}
}
}));

await dynamodb.batchwrite({
requestitems: {
[process.env.dynamodb_table]: putrequests
}
}).promise();

console.log(`stored batch ${i / batchsize + 1}`);
}

return {
statuscode: 200,
body: json.stringify({ message: `successfully fetched and stored ${hotnews.length} hot news items` })
};
} catch (error) {
console.error('error fetching hot news:', error);

// 细分错误类型
if (error.response) {
// api 响应错误
console.error(`api responded with status ${error.response.status}`);
} else if (error.request) {
// 请求未收到响应
console.error('no response received from api');
} else {
// 其他错误
console.error('error setting up request:', error.message);
}

return {
statuscode: 500,
body: json.stringify({ message: 'error fetching hot news', error: error.message })
};
}
};

 

现在让我们详细解释这个增强版的serverless job:

  1. serverless.yml 配置文件: a. 环境变量:
    yaml

    environment:
    dynamodb_table: ${self:service}-${self:provider.stage}

    这里我们定义了一个环境变量来存储dynamodb表名,它会根据服务名和部署阶段动态生成。 b. iam 角色权限:
    yaml

    iamrolestatements:
    - effect: allow
    action:
    - dynamodb:putitem
    resource: "arn:aws:dynamodb:${self:provider.region}:*:table/${self:provider.environment.dynamodb_table}"

    这个配置给予lambda函数访问dynamodb的必要权限。 c. 函数配置:
    yaml

    timeout: 30
    memorysize: 256

    我们设置了更长的超时时间和更大的内存,以确保函数有足够的资源处理数据。 d. dynamodb 表资源:
    yaml

    resources:
    resources:
    hotnewstable:
    type: aws::dynamodb::table
    properties:
    tablename: ${self:provider.environment.dynamodb_table}
    attributedefinitions:
    - attributename: id
    attributetype: s
    keyschema:
    - attributename: id
    keytype: hash
    billingmode: pay_per_request

    这个配置会自动创建dynamodb表,使用id作为主键,并设置为按需付费模式。
  2. handler.js 处理函数: a. 错误处理: 我们增加了更详细的错误处理逻辑,区分api响应错误、网络错误和其他类型的错误。这有助于更快地定位问题。 b. 批量写入:
    javascript

    const batchsize = 25;
    for (let i = 0; i < hotnews.length; i += batchsize) {
    const batch = hotnews.slice(i, i + batchsize);
    // ... 批量写入逻辑
    }

    我们使用批量写入来提高效率,每次最多写入25个项目(dynamodb的限制)。 c. 日志记录: 增加了更多的日志记录,包括开始执行、获取的新闻数量、每批存储的进度等。这有助于监控和调试。 d. 数据丰富:
    javascript

    item: {
    // ... 其他字段
    category: news.category || 'uncategorized',
    source: news.source || 'unknown'
    }

    我们添加了额外的字段(category和source),使存储的数据更加丰富。如果原始数据中没有这些字段,我们提供了默认值。
  3. 优化和最佳实践: a. api 请求超时:
    javascript

    const response = await axios.get('https://api.example.com/hot-news', {
    timeout: 5000 // 设置5秒超时
    });

    设置一个合理的超时时间可以防止函数因为api响应慢而长时间挂起。 b. 错误处理和日志: 详细的错误处理和日志记录可以帮助快速识别和解决问题。 c. 资源配置: 根据实际需求配置了合适的超时时间和内存大小。 d. 动态表名: 使用动态生成的表名允许在不同的环境(开发、测试、生产)中使用不同的表。
  4. 进一步优化建议: a. 数据验证: 在存储数据之前,可以添加数据验证逻辑,确保数据格式正确。 b. 重试机制: 对于api请求和dynamodb操作,可以实现重试逻辑以提高可靠性。 c. 监控和告警: 设置cloudwatch告警,监控函数的错误率、执行时间等指标。 d. 缓存: 如果api调用昂贵或频繁,可以考虑实现缓存机制。 e. 数据去重: 在插入数据前检查是否已存在,避免重复数据。

要有效地排查故障,您可以:

  1. 使用cloudwatch logs insights分析日志模式。
  2. 设置x-ray跟踪来详细分析函数执行过程。
  3. 使用aws lambda power tuning工具找到最佳的内存配置。
  4. 在本地使用serverless-offline插件模拟aws环境进行测试。

这个增强版的serverless job提供了更多的错误处理、性能优化和可靠性保证。它应该能够更好地处理各种边缘情况,并提供更丰富的日志信息用于故障排查。

感谢提供:05互联