Skip to main content

Data Streams

⚠️ Future Content: The comprehensive technical documentation and advanced data streaming architectures below will be provided in future documentation updates. Enterprise-grade streaming integrations and high-throughput data processing patterns are being developed for production systems.

Data streams enable real-time data integration with your digital twins, allowing continuous updates from IoT devices, APIs, databases, and other external systems.

Overview

Data streams are continuous flows of information that automatically integrate into vaults, keeping digital twins current and accurate. They bridge the gap between static documents and dynamic, real-world data.

Key Features

  • Real-time Updates - Data flows continuously as it's generated
  • Multiple Sources - IoT devices, APIs, databases, webhooks
  • Automatic Verification - Each data point cryptographically verified
  • Flexible Schemas - Support structured and unstructured data
  • Scalable Processing - Handle high-frequency data streams
  • Historical Storage - Complete data history with timestamps

Stream Types

IoT Sensor Streams

Direct integration with IoT devices and sensor networks:

const sensorStream = {
name: "Temperature Monitoring",
type: "iot_sensor",
vaultId: "vault_abc123",

source: {
deviceId: "TEMP-SENSOR-001",
protocol: "mqtt",
endpoint: "mqtt://iot.company.com:1883",
topic: "sensors/temperature/facility-a",
credentials: {
username: "sensor_user",
password: "sensor_pass"
}
},

schema: {
temperature: { type: "number", unit: "celsius", range: [-40, 100] },
humidity: { type: "number", unit: "percent", range: [0, 100] },
timestamp: { type: "datetime", required: true },
location: { type: "string", enum: ["warehouse", "office", "production"] },
batteryLevel: { type: "number", unit: "percent", range: [0, 100] }
},

processing: {
frequency: "1m", // Process every minute
aggregation: "average", // Average values within time window
validation: "strict", // Validate against schema
alertThresholds: {
temperature: { min: 18, max: 26 },
humidity: { min: 40, max: 60 }
}
}
};

const stream = await filedgr.createDataStream(sensorStream);

API Data Streams

Pull data from REST APIs, databases, or other web services:

const apiStream = {
name: "Financial Data Feed",
type: "api_pull",
vaultId: "vault_financial_001",

source: {
endpoint: "https://api.company.com/financial/accounts",
method: "GET",
headers: {
"Authorization": "Bearer ${API_TOKEN}",
"Content-Type": "application/json"
},
schedule: "0 */6 * * *", // Every 6 hours
timeout: 30000
},

schema: {
accountId: { type: "string", required: true },
balance: { type: "number", required: true },
currency: { type: "string", required: true },
lastTransaction: { type: "datetime" },
status: { type: "string", enum: ["active", "frozen", "closed"] }
},

processing: {
transform: `
// Transform API response to stream format
function transform(apiData) {
return apiData.accounts.map(account => ({
accountId: account.id,
balance: account.current_balance,
currency: account.currency_code,
lastTransaction: account.last_transaction_date,
status: account.status.toLowerCase(),
timestamp: new Date().toISOString()
}));
}
`,
validation: "strict",
deduplicate: true
}
};

Event Streams

Capture discrete events and state changes:

const eventStream = {
name: "Manufacturing Events",
type: "event_stream",
vaultId: "vault_manufacturing_001",

source: {
type: "webhook",
endpoint: "https://webhook.filedgr.io/streams/${streamId}",
authentication: "hmac-sha256",
secret: "${WEBHOOK_SECRET}"
},

eventTypes: {
production_started: {
batchId: { type: "string", required: true },
productType: { type: "string", required: true },
quantity: { type: "number", min: 1 },
startTime: { type: "datetime", required: true },
operator: { type: "string" }
},

quality_check: {
batchId: { type: "string", required: true },
testType: { type: "string", required: true },
result: { type: "string", enum: ["pass", "fail", "conditional"] },
metrics: { type: "object" },
inspector: { type: "string" },
timestamp: { type: "datetime", required: true }
},

batch_completed: {
batchId: { type: "string", required: true },
completionTime: { type: "datetime", required: true },
finalQuantity: { type: "number", min: 0 },
qualityGrade: { type: "string", enum: ["A", "B", "C"] },
notes: { type: "string" }
}
}
};

Database Streams

Sync with existing database systems:

const dbStream = {
name: "Inventory Sync",
type: "database",
vaultId: "vault_inventory_001",

source: {
type: "postgresql",
connection: {
host: "db.company.com",
port: 5432,
database: "inventory",
username: "${DB_USER}",
password: "${DB_PASS}",
ssl: true
},

query: `
SELECT
product_id,
quantity_on_hand,
location,
last_updated,
reorder_level,
supplier_id
FROM inventory
WHERE last_updated > $1
ORDER BY last_updated ASC
`,

schedule: "*/15 * * * *", // Every 15 minutes
parameters: ["${lastProcessedTime}"],
incrementalColumn: "last_updated"
},

schema: {
productId: { type: "string", required: true },
quantityOnHand: { type: "number", min: 0 },
location: { type: "string", required: true },
lastUpdated: { type: "datetime", required: true },
reorderLevel: { type: "number", min: 0 },
supplierId: { type: "string" }
}
};

Creating Data Streams

Basic Stream Creation

curl -X POST "https://api.filedgr.io/streams" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Temperature Monitoring",
"vaultId": "vault_abc123",
"type": "iot_sensor",
"source": {
"deviceId": "TEMP-001",
"protocol": "mqtt",
"endpoint": "mqtt://sensors.company.com:1883",
"topic": "facility/temperature"
},
"schema": {
"temperature": {"type": "number", "unit": "celsius"},
"timestamp": {"type": "datetime", "required": true}
}
}'

Response:

{
"streamId": "stream_xyz789",
"name": "Temperature Monitoring",
"vaultId": "vault_abc123",
"type": "iot_sensor",
"status": "created",
"endpoint": "https://webhook.filedgr.io/streams/stream_xyz789",
"createdAt": "2024-01-15T10:30:00Z"
}

SDK Usage

import { Filedgr } from '@filedgr/sdk';

const filedgr = new Filedgr({
apiKey: process.env.FILEDGR_API_KEY,
apiSecret: process.env.FILEDGR_API_SECRET
});

// Create IoT sensor stream
const sensorStream = await filedgr.streams.create({
name: "Environmental Monitoring",
vaultId: "vault_facility_001",
type: "iot_sensor",

source: {
deviceId: "ENV-SENSOR-001",
protocol: "http",
endpoint: "https://api.filedgr.io/streams/ingest",
authentication: "api_key"
},

schema: {
temperature: { type: "number", unit: "celsius" },
humidity: { type: "number", unit: "percent" },
co2: { type: "number", unit: "ppm" },
timestamp: { type: "datetime", required: true }
},

processing: {
frequency: "30s",
aggregation: "latest",
alerting: {
co2: { threshold: 1000, condition: "greater_than" }
}
}
});

console.log(`Stream created: ${sensorStream.id}`);

Data Ingestion

Direct HTTP Ingestion

# Send data to stream endpoint
curl -X POST "https://api.filedgr.io/streams/stream_xyz789/data" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"timestamp": "2024-01-15T10:30:00Z",
"temperature": 23.5,
"humidity": 65.2,
"location": "warehouse-section-a"
}'

Batch Data Ingestion

// Send multiple data points efficiently
const batchData = [
{
timestamp: "2024-01-15T10:30:00Z",
temperature: 23.5,
humidity: 65.2,
co2: 425
},
{
timestamp: "2024-01-15T10:31:00Z",
temperature: 23.7,
humidity: 64.8,
co2: 430
},
{
timestamp: "2024-01-15T10:32:00Z",
temperature: 23.4,
humidity: 65.5,
co2: 435
}
];

await filedgr.streams.ingestBatch("stream_xyz789", batchData);

MQTT Integration

// MQTT client for IoT devices
const mqtt = require('mqtt');

class IoTStreamClient {
constructor(streamId, deviceId) {
this.streamId = streamId;
this.deviceId = deviceId;
this.client = null;
}

async connect() {
this.client = mqtt.connect('mqtt://iot.company.com:1883', {
clientId: this.deviceId,
username: 'device_user',
password: 'device_pass'
});

this.client.on('connect', () => {
console.log(`Device ${this.deviceId} connected`);
this.startSending();
});
}

startSending() {
setInterval(() => {
const data = {
deviceId: this.deviceId,
temperature: 20 + Math.random() * 10,
humidity: 40 + Math.random() * 40,
timestamp: new Date().toISOString()
};

this.client.publish(
`sensors/data/${this.deviceId}`,
JSON.stringify(data)
);
}, 30000); // Every 30 seconds
}
}

// Usage
const iotClient = new IoTStreamClient("stream_xyz789", "TEMP-001");
await iotClient.connect();

Stream Management

Monitor Stream Status

curl -X GET "https://api.filedgr.io/streams/stream_xyz789/status" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"

Response:

{
"streamId": "stream_xyz789",
"status": "active",
"lastDataReceived": "2024-01-15T10:45:30Z",
"dataPointsToday": 1440,
"errorCount": 0,
"health": {
"status": "healthy",
"uptime": "99.8%",
"avgLatency": "125ms"
},
"processing": {
"queueLength": 5,
"processingRate": "95.2/min",
"lastProcessed": "2024-01-15T10:45:28Z"
}
}

Update Stream Configuration

// Modify stream settings
await filedgr.streams.update("stream_xyz789", {
processing: {
frequency: "1m", // Changed from 30s to 1m
aggregation: "average", // Changed from latest to average
alerting: {
temperature: {
min: 18,
max: 28,
condition: "outside_range"
}
}
}
});

Pause and Resume Streams

# Pause stream processing
curl -X POST "https://api.filedgr.io/streams/stream_xyz789/pause" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"

# Resume stream processing
curl -X POST "https://api.filedgr.io/streams/stream_xyz789/resume" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"

Data Processing

Real-time Transformations

const streamProcessor = {
name: "Environmental Data Processor",
streamId: "stream_xyz789",

transformations: [
{
name: "unit_conversion",
function: `
function convertUnits(data) {
// Convert Fahrenheit to Celsius if needed
if (data.temperatureUnit === 'F') {
data.temperature = (data.temperature - 32) * 5/9;
data.temperatureUnit = 'C';
}
return data;
}
`
},

{
name: "quality_scoring",
function: `
function calculateQuality(data) {
let score = 100;

// Penalize extreme temperatures
if (data.temperature < 18 || data.temperature > 26) {
score -= 20;
}

// Penalize high CO2
if (data.co2 > 1000) {
score -= 30;
}

data.qualityScore = score;
return data;
}
`
}
],

aggregations: [
{
name: "hourly_averages",
timeWindow: "1h",
fields: ["temperature", "humidity", "co2"],
function: "average"
},
{
name: "daily_extremes",
timeWindow: "1d",
fields: ["temperature"],
functions: ["min", "max"]
}
]
};

await filedgr.streams.setProcessor("stream_xyz789", streamProcessor);

Alerting and Notifications

const alertConfig = {
streamId: "stream_xyz789",

rules: [
{
name: "high_temperature_alert",
condition: "temperature > 30",
severity: "critical",
cooldown: "15m", // Don't repeat for 15 minutes

actions: [
{
type: "email",
recipients: ["facilities@company.com"],
template: "high_temperature",
data: {
location: "{{data.location}}",
temperature: "{{data.temperature}}"
}
},
{
type: "webhook",
url: "https://api.company.com/alerts",
payload: {
type: "environmental",
severity: "critical",
message: "Temperature exceeded safe threshold"
}
}
]
},

{
name: "equipment_maintenance_due",
condition: "equipmentHours > maintenanceThreshold",
severity: "warning",

actions: [
{
type: "create_ticket",
system: "maintenance_system",
priority: "medium",
assignee: "maintenance_team"
}
]
}
]
};

await filedgr.streams.setAlerts("stream_xyz789", alertConfig);

Query and Analytics

Historical Data Queries

# Query historical data
curl -X GET "https://api.filedgr.io/streams/stream_xyz789/data" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-G \
-d "start=2024-01-01T00:00:00Z" \
-d "end=2024-01-31T23:59:59Z" \
-d "fields=temperature,humidity" \
-d "aggregation=hourly" \
-d "limit=1000"

Advanced Analytics

// Perform analytics on stream data
const analytics = await filedgr.streams.analyze("stream_xyz789", {
timeRange: {
start: "2024-01-01T00:00:00Z",
end: "2024-01-31T23:59:59Z"
},

analyses: [
{
type: "trend_analysis",
field: "temperature",
algorithm: "linear_regression"
},
{
type: "anomaly_detection",
fields: ["temperature", "humidity"],
sensitivity: 0.8
},
{
type: "correlation_analysis",
fields: ["temperature", "humidity", "co2"],
method: "pearson"
},
{
type: "forecasting",
field: "temperature",
horizon: "7d",
model: "arima"
}
]
});

console.log('Analytics Results:', {
trends: analytics.trends,
anomalies: analytics.anomalies,
correlations: analytics.correlations,
forecast: analytics.forecast
});

Real-time Dashboards

// Create real-time dashboard
const dashboard = {
name: "Facility Monitoring Dashboard",
streamIds: ["stream_xyz789", "stream_abc456"],

widgets: [
{
type: "line_chart",
title: "Temperature Trends",
streamId: "stream_xyz789",
field: "temperature",
timeWindow: "24h",
refreshInterval: "30s"
},
{
type: "gauge",
title: "Current Humidity",
streamId: "stream_xyz789",
field: "humidity",
ranges: [
{ min: 0, max: 30, color: "red", label: "Too Low" },
{ min: 30, max: 70, color: "green", label: "Optimal" },
{ min: 70, max: 100, color: "red", label: "Too High" }
]
},
{
type: "alert_panel",
title: "Active Alerts",
streamIds: ["stream_xyz789", "stream_abc456"],
severities: ["critical", "warning"]
}
],

layout: {
columns: 3,
responsive: true
}
};

const dashboardUrl = await filedgr.dashboards.create(dashboard);
console.log(`Dashboard available at: ${dashboardUrl}`);

Integration Examples

Manufacturing Line Integration

// Monitor production line in real-time
class ProductionLineStream {
constructor(lineId, vaultId) {
this.lineId = lineId;
this.vaultId = vaultId;
this.streamId = null;
}

async initialize() {
// Create production monitoring stream
const stream = await filedgr.streams.create({
name: `Production Line ${this.lineId}`,
vaultId: this.vaultId,
type: "manufacturing",

source: {
type: "plc_integration",
endpoint: `plc://line-${this.lineId}.factory.com:502`,
protocol: "modbus_tcp",
registers: {
production_count: 40001,
line_speed: 40002,
quality_status: 40003,
temperature: 40004,
pressure: 40005
}
},

schema: {
productionCount: { type: "number", min: 0 },
lineSpeed: { type: "number", unit: "units/min" },
qualityStatus: { type: "string", enum: ["good", "warning", "error"] },
temperature: { type: "number", unit: "celsius" },
pressure: { type: "number", unit: "bar" },
timestamp: { type: "datetime", required: true }
},

processing: {
frequency: "10s",
validation: "strict",
alerting: {
qualityStatus: {
condition: "equals",
value: "error",
severity: "critical"
},
lineSpeed: {
condition: "less_than",
value: 50,
severity: "warning"
}
}
}
});

this.streamId = stream.id;
return stream;
}

async getProductionMetrics(timeframe = "1d") {
const data = await filedgr.streams.query(this.streamId, {
timeRange: timeframe,
aggregation: {
productionCount: "sum",
lineSpeed: "average",
qualityStatus: "mode"
}
});

return {
totalProduction: data.productionCount,
averageSpeed: data.lineSpeed,
qualityScore: this.calculateQualityScore(data.qualityStatus),
efficiency: (data.lineSpeed / 100) * 100 // Assuming 100 units/min is max
};
}

calculateQualityScore(statusData) {
const total = statusData.good + statusData.warning + statusData.error;
return ((statusData.good + statusData.warning * 0.5) / total) * 100;
}
}

Smart Building Integration

// Comprehensive building management system
class SmartBuildingStream {
constructor(buildingId, vaultId) {
this.buildingId = buildingId;
this.vaultId = vaultId;
this.streams = new Map();
}

async initializeStreams() {
// HVAC System Stream
const hvacStream = await filedgr.streams.create({
name: "HVAC Monitoring",
vaultId: this.vaultId,
type: "building_systems",

source: {
type: "bacnet",
device: `BAC-HVAC-${this.buildingId}`,
points: [
"Zone1.Temperature",
"Zone1.Humidity",
"Zone1.Setpoint",
"System.Energy.Consumption",
"System.Status"
]
},

processing: {
frequency: "5m",
alerting: {
energyConsumption: {
condition: "greater_than",
baseline: "historical_average * 1.2"
}
}
}
});

// Occupancy Stream
const occupancyStream = await filedgr.streams.create({
name: "Occupancy Monitoring",
vaultId: this.vaultId,
type: "occupancy",

source: {
type: "mqtt",
endpoint: "mqtt://building.sensors.com:1883",
topics: [
`building/${this.buildingId}/occupancy/+/count`,
`building/${this.buildingId}/access/+/events`
]
},

schema: {
zone: { type: "string", required: true },
occupantCount: { type: "number", min: 0 },
accessEvents: { type: "array" },
timestamp: { type: "datetime", required: true }
}
});

// Energy Management Stream
const energyStream = await filedgr.streams.create({
name: "Energy Management",
vaultId: this.vaultId,
type: "energy",

source: {
type: "api_pull",
endpoint: `https://api.energy.com/buildings/${this.buildingId}/meters`,
schedule: "*/15 * * * *", // Every 15 minutes
authentication: "bearer_token"
},

processing: {
transformations: [
{
name: "cost_calculation",
function: `
function calculateCost(data) {
const rateSchedule = {
peak: 0.15, // $0.15/kWh during peak hours
offPeak: 0.08 // $0.08/kWh during off-peak
};

const hour = new Date(data.timestamp).getHours();
const isPeak = hour >= 9 && hour <= 17;
const rate = isPeak ? rateSchedule.peak : rateSchedule.offPeak;

data.cost = data.consumption * rate;
data.rateType = isPeak ? 'peak' : 'offPeak';

return data;
}
`
}
]
}
});

this.streams.set("hvac", hvacStream.id);
this.streams.set("occupancy", occupancyStream.id);
this.streams.set("energy", energyStream.id);
}

async getBuildingInsights(timeframe = "1d") {
const insights = {};

// HVAC Efficiency
const hvacData = await filedgr.streams.analyze(this.streams.get("hvac"), {
timeRange: timeframe,
analyses: ["energy_efficiency", "comfort_score"]
});
insights.hvac = hvacData;

// Occupancy Patterns
const occupancyData = await filedgr.streams.analyze(this.streams.get("occupancy"), {
timeRange: timeframe,
analyses: ["usage_patterns", "peak_hours"]
});
insights.occupancy = occupancyData;

// Energy Optimization
const energyData = await filedgr.streams.analyze(this.streams.get("energy"), {
timeRange: timeframe,
analyses: ["consumption_trends", "cost_optimization"]
});
insights.energy = energyData;

return insights;
}
}

Best Practices

Stream Design

  • Single responsibility - Each stream should track one specific data source
  • Appropriate frequency - Balance real-time needs with resource usage
  • Schema validation - Define clear data structure and validation rules
  • Error handling - Plan for network failures and data quality issues

Data Quality

  • Input validation - Validate data at ingestion point
  • Duplicate detection - Handle duplicate data points appropriately
  • Missing data handling - Define how to handle gaps in data
  • Quality scoring - Implement data quality metrics and monitoring

Performance

  • Batch processing - Group multiple data points for efficiency
  • Appropriate storage - Choose storage based on access patterns
  • Caching strategy - Cache frequently accessed data
  • Resource monitoring - Monitor CPU, memory, and network usage

Security

  • Authentication - Secure all data sources and endpoints
  • Encryption - Encrypt data in transit and at rest
  • Access controls - Limit who can create and modify streams
  • Audit logging - Track all stream operations and data access

Troubleshooting

Common Issues

Stream Not Receiving Data

  1. Check source connectivity - Verify endpoint is reachable
  2. Validate authentication - Ensure credentials are correct
  3. Review schema validation - Check if data matches expected format
  4. Monitor error logs - Look for specific error messages

High Data Volume Performance

  1. Batch data ingestion - Send multiple data points together
  2. Optimize processing frequency - Reduce processing frequency if possible
  3. Use data sampling - Process subset of high-frequency data
  4. Scale infrastructure - Increase processing capacity

Data Quality Issues

  1. Implement validation rules - Add stricter schema validation
  2. Data cleansing - Add transformation functions to clean data
  3. Anomaly detection - Automatically flag unusual data points
  4. Source quality monitoring - Monitor data source health

Monitoring and Alerting

// Comprehensive stream monitoring
const monitoringConfig = {
streamId: "stream_xyz789",

healthChecks: [
{
name: "data_freshness",
check: "last_data_received < 5m",
severity: "warning"
},
{
name: "error_rate",
check: "error_rate < 5%",
severity: "critical"
},
{
name: "processing_lag",
check: "processing_lag < 30s",
severity: "warning"
}
],

notifications: {
email: ["ops@company.com"],
slack: "#monitoring",
webhook: "https://api.company.com/alerts"
}
};

await filedgr.streams.setMonitoring("stream_xyz789", monitoringConfig);

Next Steps

Data streams bridge the gap between static records and dynamic reality, enabling your digital twins to stay current and actionable. Start streaming data today and unlock the full potential of real-time verification.