Data Streams
⚠️ Future Content: The comprehensive technical documentation and advanced data streaming architectures below will be provided in future documentation updates. Enterprise-grade streaming integrations and high-throughput data processing patterns are being developed for production systems.
Data streams enable real-time data integration with your digital twins, allowing continuous updates from IoT devices, APIs, databases, and other external systems.
Overview
Data streams are continuous flows of information that automatically integrate into vaults, keeping digital twins current and accurate. They bridge the gap between static documents and dynamic, real-world data.
Key Features
- Real-time Updates - Data flows continuously as it's generated
- Multiple Sources - IoT devices, APIs, databases, webhooks
- Automatic Verification - Each data point cryptographically verified
- Flexible Schemas - Support structured and unstructured data
- Scalable Processing - Handle high-frequency data streams
- Historical Storage - Complete data history with timestamps
Stream Types
IoT Sensor Streams
Direct integration with IoT devices and sensor networks:
const sensorStream = {
name: "Temperature Monitoring",
type: "iot_sensor",
vaultId: "vault_abc123",
source: {
deviceId: "TEMP-SENSOR-001",
protocol: "mqtt",
endpoint: "mqtt://iot.company.com:1883",
topic: "sensors/temperature/facility-a",
credentials: {
username: "sensor_user",
password: "sensor_pass"
}
},
schema: {
temperature: { type: "number", unit: "celsius", range: [-40, 100] },
humidity: { type: "number", unit: "percent", range: [0, 100] },
timestamp: { type: "datetime", required: true },
location: { type: "string", enum: ["warehouse", "office", "production"] },
batteryLevel: { type: "number", unit: "percent", range: [0, 100] }
},
processing: {
frequency: "1m", // Process every minute
aggregation: "average", // Average values within time window
validation: "strict", // Validate against schema
alertThresholds: {
temperature: { min: 18, max: 26 },
humidity: { min: 40, max: 60 }
}
}
};
const stream = await filedgr.createDataStream(sensorStream);
API Data Streams
Pull data from REST APIs, databases, or other web services:
const apiStream = {
name: "Financial Data Feed",
type: "api_pull",
vaultId: "vault_financial_001",
source: {
endpoint: "https://api.company.com/financial/accounts",
method: "GET",
headers: {
"Authorization": "Bearer ${API_TOKEN}",
"Content-Type": "application/json"
},
schedule: "0 */6 * * *", // Every 6 hours
timeout: 30000
},
schema: {
accountId: { type: "string", required: true },
balance: { type: "number", required: true },
currency: { type: "string", required: true },
lastTransaction: { type: "datetime" },
status: { type: "string", enum: ["active", "frozen", "closed"] }
},
processing: {
transform: `
// Transform API response to stream format
function transform(apiData) {
return apiData.accounts.map(account => ({
accountId: account.id,
balance: account.current_balance,
currency: account.currency_code,
lastTransaction: account.last_transaction_date,
status: account.status.toLowerCase(),
timestamp: new Date().toISOString()
}));
}
`,
validation: "strict",
deduplicate: true
}
};
Event Streams
Capture discrete events and state changes:
const eventStream = {
name: "Manufacturing Events",
type: "event_stream",
vaultId: "vault_manufacturing_001",
source: {
type: "webhook",
endpoint: "https://webhook.filedgr.io/streams/${streamId}",
authentication: "hmac-sha256",
secret: "${WEBHOOK_SECRET}"
},
eventTypes: {
production_started: {
batchId: { type: "string", required: true },
productType: { type: "string", required: true },
quantity: { type: "number", min: 1 },
startTime: { type: "datetime", required: true },
operator: { type: "string" }
},
quality_check: {
batchId: { type: "string", required: true },
testType: { type: "string", required: true },
result: { type: "string", enum: ["pass", "fail", "conditional"] },
metrics: { type: "object" },
inspector: { type: "string" },
timestamp: { type: "datetime", required: true }
},
batch_completed: {
batchId: { type: "string", required: true },
completionTime: { type: "datetime", required: true },
finalQuantity: { type: "number", min: 0 },
qualityGrade: { type: "string", enum: ["A", "B", "C"] },
notes: { type: "string" }
}
}
};
Database Streams
Sync with existing database systems:
const dbStream = {
name: "Inventory Sync",
type: "database",
vaultId: "vault_inventory_001",
source: {
type: "postgresql",
connection: {
host: "db.company.com",
port: 5432,
database: "inventory",
username: "${DB_USER}",
password: "${DB_PASS}",
ssl: true
},
query: `
SELECT
product_id,
quantity_on_hand,
location,
last_updated,
reorder_level,
supplier_id
FROM inventory
WHERE last_updated > $1
ORDER BY last_updated ASC
`,
schedule: "*/15 * * * *", // Every 15 minutes
parameters: ["${lastProcessedTime}"],
incrementalColumn: "last_updated"
},
schema: {
productId: { type: "string", required: true },
quantityOnHand: { type: "number", min: 0 },
location: { type: "string", required: true },
lastUpdated: { type: "datetime", required: true },
reorderLevel: { type: "number", min: 0 },
supplierId: { type: "string" }
}
};
Creating Data Streams
Basic Stream Creation
curl -X POST "https://api.filedgr.io/streams" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Temperature Monitoring",
"vaultId": "vault_abc123",
"type": "iot_sensor",
"source": {
"deviceId": "TEMP-001",
"protocol": "mqtt",
"endpoint": "mqtt://sensors.company.com:1883",
"topic": "facility/temperature"
},
"schema": {
"temperature": {"type": "number", "unit": "celsius"},
"timestamp": {"type": "datetime", "required": true}
}
}'
Response:
{
"streamId": "stream_xyz789",
"name": "Temperature Monitoring",
"vaultId": "vault_abc123",
"type": "iot_sensor",
"status": "created",
"endpoint": "https://webhook.filedgr.io/streams/stream_xyz789",
"createdAt": "2024-01-15T10:30:00Z"
}
SDK Usage
import { Filedgr } from '@filedgr/sdk';
const filedgr = new Filedgr({
apiKey: process.env.FILEDGR_API_KEY,
apiSecret: process.env.FILEDGR_API_SECRET
});
// Create IoT sensor stream
const sensorStream = await filedgr.streams.create({
name: "Environmental Monitoring",
vaultId: "vault_facility_001",
type: "iot_sensor",
source: {
deviceId: "ENV-SENSOR-001",
protocol: "http",
endpoint: "https://api.filedgr.io/streams/ingest",
authentication: "api_key"
},
schema: {
temperature: { type: "number", unit: "celsius" },
humidity: { type: "number", unit: "percent" },
co2: { type: "number", unit: "ppm" },
timestamp: { type: "datetime", required: true }
},
processing: {
frequency: "30s",
aggregation: "latest",
alerting: {
co2: { threshold: 1000, condition: "greater_than" }
}
}
});
console.log(`Stream created: ${sensorStream.id}`);
Data Ingestion
Direct HTTP Ingestion
# Send data to stream endpoint
curl -X POST "https://api.filedgr.io/streams/stream_xyz789/data" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"timestamp": "2024-01-15T10:30:00Z",
"temperature": 23.5,
"humidity": 65.2,
"location": "warehouse-section-a"
}'
Batch Data Ingestion
// Send multiple data points efficiently
const batchData = [
{
timestamp: "2024-01-15T10:30:00Z",
temperature: 23.5,
humidity: 65.2,
co2: 425
},
{
timestamp: "2024-01-15T10:31:00Z",
temperature: 23.7,
humidity: 64.8,
co2: 430
},
{
timestamp: "2024-01-15T10:32:00Z",
temperature: 23.4,
humidity: 65.5,
co2: 435
}
];
await filedgr.streams.ingestBatch("stream_xyz789", batchData);
MQTT Integration
// MQTT client for IoT devices
const mqtt = require('mqtt');
class IoTStreamClient {
constructor(streamId, deviceId) {
this.streamId = streamId;
this.deviceId = deviceId;
this.client = null;
}
async connect() {
this.client = mqtt.connect('mqtt://iot.company.com:1883', {
clientId: this.deviceId,
username: 'device_user',
password: 'device_pass'
});
this.client.on('connect', () => {
console.log(`Device ${this.deviceId} connected`);
this.startSending();
});
}
startSending() {
setInterval(() => {
const data = {
deviceId: this.deviceId,
temperature: 20 + Math.random() * 10,
humidity: 40 + Math.random() * 40,
timestamp: new Date().toISOString()
};
this.client.publish(
`sensors/data/${this.deviceId}`,
JSON.stringify(data)
);
}, 30000); // Every 30 seconds
}
}
// Usage
const iotClient = new IoTStreamClient("stream_xyz789", "TEMP-001");
await iotClient.connect();
Stream Management
Monitor Stream Status
curl -X GET "https://api.filedgr.io/streams/stream_xyz789/status" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
Response:
{
"streamId": "stream_xyz789",
"status": "active",
"lastDataReceived": "2024-01-15T10:45:30Z",
"dataPointsToday": 1440,
"errorCount": 0,
"health": {
"status": "healthy",
"uptime": "99.8%",
"avgLatency": "125ms"
},
"processing": {
"queueLength": 5,
"processingRate": "95.2/min",
"lastProcessed": "2024-01-15T10:45:28Z"
}
}
Update Stream Configuration
// Modify stream settings
await filedgr.streams.update("stream_xyz789", {
processing: {
frequency: "1m", // Changed from 30s to 1m
aggregation: "average", // Changed from latest to average
alerting: {
temperature: {
min: 18,
max: 28,
condition: "outside_range"
}
}
}
});
Pause and Resume Streams
# Pause stream processing
curl -X POST "https://api.filedgr.io/streams/stream_xyz789/pause" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
# Resume stream processing
curl -X POST "https://api.filedgr.io/streams/stream_xyz789/resume" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
Data Processing
Real-time Transformations
const streamProcessor = {
name: "Environmental Data Processor",
streamId: "stream_xyz789",
transformations: [
{
name: "unit_conversion",
function: `
function convertUnits(data) {
// Convert Fahrenheit to Celsius if needed
if (data.temperatureUnit === 'F') {
data.temperature = (data.temperature - 32) * 5/9;
data.temperatureUnit = 'C';
}
return data;
}
`
},
{
name: "quality_scoring",
function: `
function calculateQuality(data) {
let score = 100;
// Penalize extreme temperatures
if (data.temperature < 18 || data.temperature > 26) {
score -= 20;
}
// Penalize high CO2
if (data.co2 > 1000) {
score -= 30;
}
data.qualityScore = score;
return data;
}
`
}
],
aggregations: [
{
name: "hourly_averages",
timeWindow: "1h",
fields: ["temperature", "humidity", "co2"],
function: "average"
},
{
name: "daily_extremes",
timeWindow: "1d",
fields: ["temperature"],
functions: ["min", "max"]
}
]
};
await filedgr.streams.setProcessor("stream_xyz789", streamProcessor);
Alerting and Notifications
const alertConfig = {
streamId: "stream_xyz789",
rules: [
{
name: "high_temperature_alert",
condition: "temperature > 30",
severity: "critical",
cooldown: "15m", // Don't repeat for 15 minutes
actions: [
{
type: "email",
recipients: ["facilities@company.com"],
template: "high_temperature",
data: {
location: "{{data.location}}",
temperature: "{{data.temperature}}"
}
},
{
type: "webhook",
url: "https://api.company.com/alerts",
payload: {
type: "environmental",
severity: "critical",
message: "Temperature exceeded safe threshold"
}
}
]
},
{
name: "equipment_maintenance_due",
condition: "equipmentHours > maintenanceThreshold",
severity: "warning",
actions: [
{
type: "create_ticket",
system: "maintenance_system",
priority: "medium",
assignee: "maintenance_team"
}
]
}
]
};
await filedgr.streams.setAlerts("stream_xyz789", alertConfig);
Query and Analytics
Historical Data Queries
# Query historical data
curl -X GET "https://api.filedgr.io/streams/stream_xyz789/data" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-G \
-d "start=2024-01-01T00:00:00Z" \
-d "end=2024-01-31T23:59:59Z" \
-d "fields=temperature,humidity" \
-d "aggregation=hourly" \
-d "limit=1000"
Advanced Analytics
// Perform analytics on stream data
const analytics = await filedgr.streams.analyze("stream_xyz789", {
timeRange: {
start: "2024-01-01T00:00:00Z",
end: "2024-01-31T23:59:59Z"
},
analyses: [
{
type: "trend_analysis",
field: "temperature",
algorithm: "linear_regression"
},
{
type: "anomaly_detection",
fields: ["temperature", "humidity"],
sensitivity: 0.8
},
{
type: "correlation_analysis",
fields: ["temperature", "humidity", "co2"],
method: "pearson"
},
{
type: "forecasting",
field: "temperature",
horizon: "7d",
model: "arima"
}
]
});
console.log('Analytics Results:', {
trends: analytics.trends,
anomalies: analytics.anomalies,
correlations: analytics.correlations,
forecast: analytics.forecast
});
Real-time Dashboards
// Create real-time dashboard
const dashboard = {
name: "Facility Monitoring Dashboard",
streamIds: ["stream_xyz789", "stream_abc456"],
widgets: [
{
type: "line_chart",
title: "Temperature Trends",
streamId: "stream_xyz789",
field: "temperature",
timeWindow: "24h",
refreshInterval: "30s"
},
{
type: "gauge",
title: "Current Humidity",
streamId: "stream_xyz789",
field: "humidity",
ranges: [
{ min: 0, max: 30, color: "red", label: "Too Low" },
{ min: 30, max: 70, color: "green", label: "Optimal" },
{ min: 70, max: 100, color: "red", label: "Too High" }
]
},
{
type: "alert_panel",
title: "Active Alerts",
streamIds: ["stream_xyz789", "stream_abc456"],
severities: ["critical", "warning"]
}
],
layout: {
columns: 3,
responsive: true
}
};
const dashboardUrl = await filedgr.dashboards.create(dashboard);
console.log(`Dashboard available at: ${dashboardUrl}`);
Integration Examples
Manufacturing Line Integration
// Monitor production line in real-time
class ProductionLineStream {
constructor(lineId, vaultId) {
this.lineId = lineId;
this.vaultId = vaultId;
this.streamId = null;
}
async initialize() {
// Create production monitoring stream
const stream = await filedgr.streams.create({
name: `Production Line ${this.lineId}`,
vaultId: this.vaultId,
type: "manufacturing",
source: {
type: "plc_integration",
endpoint: `plc://line-${this.lineId}.factory.com:502`,
protocol: "modbus_tcp",
registers: {
production_count: 40001,
line_speed: 40002,
quality_status: 40003,
temperature: 40004,
pressure: 40005
}
},
schema: {
productionCount: { type: "number", min: 0 },
lineSpeed: { type: "number", unit: "units/min" },
qualityStatus: { type: "string", enum: ["good", "warning", "error"] },
temperature: { type: "number", unit: "celsius" },
pressure: { type: "number", unit: "bar" },
timestamp: { type: "datetime", required: true }
},
processing: {
frequency: "10s",
validation: "strict",
alerting: {
qualityStatus: {
condition: "equals",
value: "error",
severity: "critical"
},
lineSpeed: {
condition: "less_than",
value: 50,
severity: "warning"
}
}
}
});
this.streamId = stream.id;
return stream;
}
async getProductionMetrics(timeframe = "1d") {
const data = await filedgr.streams.query(this.streamId, {
timeRange: timeframe,
aggregation: {
productionCount: "sum",
lineSpeed: "average",
qualityStatus: "mode"
}
});
return {
totalProduction: data.productionCount,
averageSpeed: data.lineSpeed,
qualityScore: this.calculateQualityScore(data.qualityStatus),
efficiency: (data.lineSpeed / 100) * 100 // Assuming 100 units/min is max
};
}
calculateQualityScore(statusData) {
const total = statusData.good + statusData.warning + statusData.error;
return ((statusData.good + statusData.warning * 0.5) / total) * 100;
}
}
Smart Building Integration
// Comprehensive building management system
class SmartBuildingStream {
constructor(buildingId, vaultId) {
this.buildingId = buildingId;
this.vaultId = vaultId;
this.streams = new Map();
}
async initializeStreams() {
// HVAC System Stream
const hvacStream = await filedgr.streams.create({
name: "HVAC Monitoring",
vaultId: this.vaultId,
type: "building_systems",
source: {
type: "bacnet",
device: `BAC-HVAC-${this.buildingId}`,
points: [
"Zone1.Temperature",
"Zone1.Humidity",
"Zone1.Setpoint",
"System.Energy.Consumption",
"System.Status"
]
},
processing: {
frequency: "5m",
alerting: {
energyConsumption: {
condition: "greater_than",
baseline: "historical_average * 1.2"
}
}
}
});
// Occupancy Stream
const occupancyStream = await filedgr.streams.create({
name: "Occupancy Monitoring",
vaultId: this.vaultId,
type: "occupancy",
source: {
type: "mqtt",
endpoint: "mqtt://building.sensors.com:1883",
topics: [
`building/${this.buildingId}/occupancy/+/count`,
`building/${this.buildingId}/access/+/events`
]
},
schema: {
zone: { type: "string", required: true },
occupantCount: { type: "number", min: 0 },
accessEvents: { type: "array" },
timestamp: { type: "datetime", required: true }
}
});
// Energy Management Stream
const energyStream = await filedgr.streams.create({
name: "Energy Management",
vaultId: this.vaultId,
type: "energy",
source: {
type: "api_pull",
endpoint: `https://api.energy.com/buildings/${this.buildingId}/meters`,
schedule: "*/15 * * * *", // Every 15 minutes
authentication: "bearer_token"
},
processing: {
transformations: [
{
name: "cost_calculation",
function: `
function calculateCost(data) {
const rateSchedule = {
peak: 0.15, // $0.15/kWh during peak hours
offPeak: 0.08 // $0.08/kWh during off-peak
};
const hour = new Date(data.timestamp).getHours();
const isPeak = hour >= 9 && hour <= 17;
const rate = isPeak ? rateSchedule.peak : rateSchedule.offPeak;
data.cost = data.consumption * rate;
data.rateType = isPeak ? 'peak' : 'offPeak';
return data;
}
`
}
]
}
});
this.streams.set("hvac", hvacStream.id);
this.streams.set("occupancy", occupancyStream.id);
this.streams.set("energy", energyStream.id);
}
async getBuildingInsights(timeframe = "1d") {
const insights = {};
// HVAC Efficiency
const hvacData = await filedgr.streams.analyze(this.streams.get("hvac"), {
timeRange: timeframe,
analyses: ["energy_efficiency", "comfort_score"]
});
insights.hvac = hvacData;
// Occupancy Patterns
const occupancyData = await filedgr.streams.analyze(this.streams.get("occupancy"), {
timeRange: timeframe,
analyses: ["usage_patterns", "peak_hours"]
});
insights.occupancy = occupancyData;
// Energy Optimization
const energyData = await filedgr.streams.analyze(this.streams.get("energy"), {
timeRange: timeframe,
analyses: ["consumption_trends", "cost_optimization"]
});
insights.energy = energyData;
return insights;
}
}
Best Practices
Stream Design
- Single responsibility - Each stream should track one specific data source
- Appropriate frequency - Balance real-time needs with resource usage
- Schema validation - Define clear data structure and validation rules
- Error handling - Plan for network failures and data quality issues
Data Quality
- Input validation - Validate data at ingestion point
- Duplicate detection - Handle duplicate data points appropriately
- Missing data handling - Define how to handle gaps in data
- Quality scoring - Implement data quality metrics and monitoring
Performance
- Batch processing - Group multiple data points for efficiency
- Appropriate storage - Choose storage based on access patterns
- Caching strategy - Cache frequently accessed data
- Resource monitoring - Monitor CPU, memory, and network usage
Security
- Authentication - Secure all data sources and endpoints
- Encryption - Encrypt data in transit and at rest
- Access controls - Limit who can create and modify streams
- Audit logging - Track all stream operations and data access
Troubleshooting
Common Issues
Stream Not Receiving Data
- Check source connectivity - Verify endpoint is reachable
- Validate authentication - Ensure credentials are correct
- Review schema validation - Check if data matches expected format
- Monitor error logs - Look for specific error messages
High Data Volume Performance
- Batch data ingestion - Send multiple data points together
- Optimize processing frequency - Reduce processing frequency if possible
- Use data sampling - Process subset of high-frequency data
- Scale infrastructure - Increase processing capacity
Data Quality Issues
- Implement validation rules - Add stricter schema validation
- Data cleansing - Add transformation functions to clean data
- Anomaly detection - Automatically flag unusual data points
- Source quality monitoring - Monitor data source health
Monitoring and Alerting
// Comprehensive stream monitoring
const monitoringConfig = {
streamId: "stream_xyz789",
healthChecks: [
{
name: "data_freshness",
check: "last_data_received < 5m",
severity: "warning"
},
{
name: "error_rate",
check: "error_rate < 5%",
severity: "critical"
},
{
name: "processing_lag",
check: "processing_lag < 30s",
severity: "warning"
}
],
notifications: {
email: ["ops@company.com"],
slack: "#monitoring",
webhook: "https://api.company.com/alerts"
}
};
await filedgr.streams.setMonitoring("stream_xyz789", monitoringConfig);
Next Steps
- Learn Verification - Understand how stream data is verified
- Set Up Sharing - Share stream data with stakeholders
- Explore Webhooks - React to stream events in real-time
- Integration Examples - See real-world implementations
Data streams bridge the gap between static records and dynamic reality, enabling your digital twins to stay current and actionable. Start streaming data today and unlock the full potential of real-time verification.