导读:本篇文章是使用 InfluxDB 3 和 Grafana 将智能家居设备的数据集中到统一平台的分步干货技术指南哦。
各位伙伴们,你知道家里的智能家居设备会产生大量分散的数据。
那么,本教程将向大家展示如何使用 InfluxDB 3 和 Grafana 将这些数据集中到一个统一的平台。我们不仅可以追踪家中的生命体征,还可以学习专业的软件开发概念,例如时间序列数据库设计和构建适用于各种监控和分析系统的弹性数据管道。
在开始之前,请确保您已经熟悉如下技术或知识储备:
这种结构使得 InfluxDB 非常适合处理物联网数据,因为它针对基于时间的查询的写入密集型工作负载进行了优化。
我们用一种称为“行协议”的语法来定义它,请看如下所示:
1 2 | weather,location=london,season=summer temperature=301465839830100400200 |
来理解此语法:
InfluxData 是领先的时间序列平台 InfluxDB 的创建者。它收集、存储和分析各种规模的时间序列数据。开发者可以查询和分析带有时间戳的数据,从而实时进行预测、响应和调整。
来,我们进行如下的代码编写步骤:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | # Pull image from Docker for InfluxDB 3 Enterprise docker pull influxdb:3-enterprise
# Run InfluxDB 3 Enterprise with proper configuration docker run-d\ --name influxdb3-enterprise\ -p8181:8181\ -v$PWD/data:/var/lib/influxdb3/data\ -v$PWD/plugins:/var/lib/influxdb3/plugins\ -eINFLUXDB3_ENTERPRISE_LICENSE_EMAIL=you@example.com\ influxdb:3-enterprise\ influxdb3 serve\ --node-id=node0\ --cluster-id=cluster0\ --object-store=file\ --data-dir=/var/lib/influxdb3/data\ --host=0.0.0.0\ --port=8181 |
接下来,我们将专注于创建一个全面的数据收集器,以演示任何物联网集成所需的所有模式。
本示例使用 Nest 恒温器,但其原理适用于任何智能设备或 API。我们将构建一个单一的收集器,用于轮询Google Nest API并使用 v3 Python 客户端写入 InfluxDB 3 Enterprise。
1 | influxdb3 create database home-data |
2. 开始Nest API 设置。要从 Nest 恒温器收集数据,您需要 API 访问权限,本例中我们使用Google云平台,其它的云平台类似。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | # get_nest_token.py importrequests importwebbrowser fromurllib.parse importurlencode
CLIENT_ID="your-client-id-here" CLIENT_SECRET="your-client-secret-here"
# Generate authorization URL auth_url=f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode({ 'client_id': CLIENT_ID, 'redirect_uri': 'http://localhost', 'response_type': 'code', 'scope': 'https://www.googleapis.com/auth/sdm.service', 'access_type': 'offline' })}"
print(f"Visit: {auth_url}") webbrowser.open(auth_url)
# Get authorization code from redirect URL auth_code=input("Enter the code from the redirect URL: ")
# Exchange for tokens token_response=requests.post('https://oauth2.googleapis.com/token',data={ 'client_id':CLIENT_ID, 'client_secret':CLIENT_SECRET, 'code':auth_code, 'grant_type':'authorization_code', 'redirect_uri':'http://localhost' })
tokens=token_response.json() print(f"Access Token: {tokens['access_token']}") print(f"Refresh Token: {tokens['refresh_token']}") |
1. 创建.env文件,然后请保存到本地。
1 2 3 4 5 | NEST_ACCESS_TOKEN=your_access_token_here GOOGLE_CLOUD_PROJECT_ID=your_project_id INFLUXDB_HOST=http://localhost:8181 INFLUXDB_TOKEN=your_influxdb_token INFLUXDB_DATABASE=home-data |
1 | pip install influxdb3-python requests python-dotenv |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | # nest_collector.py importos importtime importlogging fromdatetimeimportdatetime,timezone fromfunctoolsimportwraps importrequests frominfluxdb_client_3 importInfluxDBClient3 fromdotenv importload_dotenv load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) defretry_on_failure(max_retries=3,delay=5): defdecorator(func): @wraps(func) defwrapper(*args,**kwargs): forattempt inrange(max_retries): try: returnfunc(*args,**kwargs) exceptExceptionase: ifattempt==max_retries-1: logging.error(f"{func.__name__} failed: {e}") raise logging.warning(f"Retry {attempt + 1}: {e}") time.sleep(delay) returnwrapper returndecorator classNestCollector: def__init__(self): self.access_token=os.getenv("NEST_ACCESS_TOKEN") self.project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID") ifnotself.access_token ornotself.project_id: raiseValueError("Missing NEST_ACCESS_TOKEN or GOOGLE_CLOUD_PROJECT_ID in .env file") # Initialize InfluxDB 3 client self.client=InfluxDBClient3( host=os.getenv("INFLUXDB_HOST","http://localhost:8181"), token=os.getenv("INFLUXDB_TOKEN"), database=os.getenv("INFLUXDB_DATABASE","home-data"), ) # Test connection try: list(self.client.query("SELECT 1",language="sql")) logging.info("InfluxDB connection successful") exceptExceptionase: logging.error(f"InfluxDB connection failed: {e}") raise @retry_on_failure(max_retries=3,delay=5) defget_thermostat_data(self): """Fetch data from Nest API""" url=f"https://smartdevicemanagement.googleapis.com/v1/enterprises/{self.project_id}/devices" headers={ "Authorization":f"Bearer {self.access_token}", "Content-Type":"application/json" } response=requests.get(url,headers=headers,timeout=30) response.raise_for_status() devices=response.json().get("devices",[]) data_points=[] fordeviceindevices: if"THERMOSTAT"notindevice.get("type",""): continue traits=device.get("traits",{}) device_id=device.get("name","").split("/")[-1] # Extract measurements temp_trait=traits.get("sdm.devices.traits.Temperature",{}) humidity_trait=traits.get("sdm.devices.traits.Humidity",{}) hvac_trait=traits.get("sdm.devices.traits.ThermostatHvac",{}) setpoint_trait=traits.get("sdm.devices.traits.ThermostatTemperatureSetpoint",{}) info_trait=traits.get("sdm.devices.traits.Info",{}) try: temp_celsius=float(temp_trait.get("ambientTemperatureCelsius",0)) humidity=float(humidity_trait.get("ambientHumidityPercent",0)) except(TypeError,ValueError): continue # Build data point for InfluxDB point={ "measurement":"nest_thermostat", "tags":{ "device_id":device_id, "room":info_trait.get("customName","main"), "device_type":"thermostat" }, "fields":{ "temperature_celsius":temp_celsius, "temperature_fahrenheit":temp_celsius*9/5+32, "humidity_percent":humidity, "hvac_status":hvac_trait.get("status","OFF"), "hvac_mode":hvac_trait.get("mode","UNKNOWN") }, "time":int(datetime.now(timezone.utc).timestamp()) } # Add setpoint temperatures if available if"heatCelsius"insetpoint_trait: heat_c=float(setpoint_trait["heatCelsius"]) point["fields"]["heat_setpoint_celsius"]=heat_c point["fields"]["heat_setpoint_fahrenheit"]=heat_c*9/5+32 if"coolCelsius"insetpoint_trait: cool_c=float(setpoint_trait["coolCelsius"]) point["fields"]["cool_setpoint_celsius"]=cool_c point["fields"]["cool_setpoint_fahrenheit"]=cool_c*9/5+32 data_points.append(point) logging.info(f"Collected {device_id}: {temp_celsius:.1f}°C, {humidity:.0f}%") returndata_points defwrite_to_influx(self,points): """Write data to InfluxDB""" ifnotpoints: logging.warning("No data to write") return success_count=0 forpoint inpoints: try: self.client.write(record=point,write_precision="s") success_count+=1 exceptExceptionase: logging.error(f"Write failed: {e}") logging.info(f"Wrote {success_count}/{len(points)} points") defrun_cycle(self): """Run one collection cycle""" try: data=self.get_thermostat_data() self.write_to_influx(data) exceptExceptionase: logging.error(f"Cycle failed: {e}") if__name__=="__main__": collector=NestCollector() try: whileTrue: collector.run_cycle() time.sleep(300) # Run every 5 minutes exceptKeyboardInterrupt: logging.info("Stopped by user") |
1 2 3 4 5 6 7 | # Install Grafana using Docker docker run-d\ --name grafana\ -p3000:3000\ -vgrafana-storage:/var/lib/grafana\ -e"GF_SECURITY_ADMIN_PASSWORD=your-secure-password"\ grafana/grafana:latest |
INFLUXDB_TOKEN
令牌:从 .env 文件中粘贴环境变量的字符串值,并将“不安全连接”切换为“开”2. 可以尝试使用以下 SQL 查询创建具有两个面板的仪表板来监控数据:
1 2 3 4 5 6 7 | SELECT temperature_fahrenheit, device_id FROMnest_thermostat WHERE time>=now()-interval'5 minutes' ORDER BY timeDESC LIMIT1 |
1 2 3 4 5 6 7 | SELECT date_trunc('minute',time)astime, AVG(temperature_fahrenheit)asavg_temp FROMnest_thermostat WHERE time>=now()-interval'24 hours' GROUP BY date_trunc('minute',time) ORDER BY time |
(本级为可选项)健康监测脚本:通过创建脚本“health_check.py”进行简单检查,以便保持系统健康,如下代码所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | # health_check.py importrequests fromdatetimeimportdatetime
defcheck_health(): services={ 'InfluxDB':'http://localhost:8181/health', 'Grafana':'http://localhost:3000/api/health' }
print(f"\n=== Health Check - {datetime.now().strftime('%H:%M:%S')} ===")
all_healthy=True forservice,url inservices.items(): try: response=requests.get(url,timeout=5) healthy=response.status_code==200 status="✅"ifhealthy else"❌" print(f"{service}: {status}") all_healthy=all_healthy andhealthy exceptException: print(f"{service}: ❌ Connection failed") all_healthy=False
print(f"Overall: {'✅ HEALTHY' if all_healthy else '❌ ISSUES'}\n")
if__name__=="__main__": check_health() |
其实,我们在这里构建的远远不止监控恒温器,已经实现了支持现代大规模可观测系统的基础模式。
这里编写的用于处理不稳定的物联网 API 的重试逻辑和断路器,与在服务故障时维持 Netflix 正常运行的弹性模式相同,而创建的时间序列数据建模和可视化管道则反映了大型科技公司用于每秒跟踪数百万个指标的监控基础设施。
最重要的是,我们现在了解了如何将数据视为随时间推移的事件流,而不是表中的静态记录,这是一种思维方式的转变,无论你是构建应用程序监控仪表板、分析业务指标还是使用任何生成连续数据流的系统,它都会提供非常良好的服务。
作者:洛逸
本篇文章为 @ 行动的大雄 创作并授权 21CTO 发布,未经许可,请勿转载。
内容授权事宜请您联系 webmaster@21cto.com或关注 21CTO 公众号。
该文观点仅代表作者本人,21CTO 平台仅提供信息存储空间服务。