Stream API Code Examples
This guide provides practical code examples for working with the DBConvert Streams Stream API in various programming languages. For the complete API reference, see the Stream API Reference.
Prerequisites
Before using these examples, ensure you have:
- A valid DBConvert Streams API key
- The API server running (default:
http://127.0.0.1:8020/api/v1
) - At least one configured source and target connection (see Connection API Reference)
- Required language-specific dependencies:
- Python:
requests
library (pip install requests
) - Node.js:
axios
library (npm install axios
) - PowerShell: Version 5.1 or later
- Python:
Important
Before using any connection or stream endpoints, you must first call the /user/configs
endpoint to load existing configurations:
# Using curl
curl -X GET "http://127.0.0.1:8020/api/v1/user/configs" \
-H "X-API-Key: your_api_key_here"
This endpoint loads all user configurations including existing connections and stream configurations. Make sure to call this endpoint:
- When starting your application
- Before listing or managing connections
- Before creating or managing streams
Understanding ID Lifecycle
When working with streams, you'll deal with different types of IDs that represent separate entities:
Connection IDs - Used to identify database connections:
conn_2sZyDMeIbdaXnN9OL7vYzzAiZU5 # Source connection conn_2sZyBc83hypbiTMrNTotKPyjwFC # Target connection
Stream Configuration IDs - Templates for creating streams:
config_2sZyDMeIbdaXnN9OL7vYzzAiZU5 # Reusable stream configuration
Stream IDs - Independent stream instances:
stream_2sZyDMeIbdaXnN9OL7vYzzAiZU5 # A running stream instance stream_3tZyDMeIbdaXnN9OL7vYzzBiZU6 # Another stream from the same config
TIP
Stream configurations are templates that can be used to create multiple independent streams. Each stream gets its own unique stream_
ID, separate from the configuration that created it.
Language Examples
Choose your preferred language:
- curl - For quick command-line testing
- Python - Using the requests library
- Node.js - Using axios with async/await
- PowerShell - Using PowerShell classes
Each example demonstrates a complete workflow:
- Creating source and target connections
- Testing the connections
- Creating a stream configuration
- Starting multiple streams from the configuration
- Managing running streams independently
Complete Workflow Example
Here's a complete workflow showing how to:
- Create source and target connections (getting
conn_
IDs) - Test the connections
- Create a stream configuration (getting a
config_
ID) - Create multiple streams from the configuration (getting
stream_
IDs) - Manage the running streams independently
Using curl
Complete command-line examples for testing and managing streams:
# Set your API key and base URL
export API_KEY="your_api_key_here"
export API_URL="http://127.0.0.1:8020/api/v1"
# Load user configurations (REQUIRED)
curl -X GET "$API_URL/user/configs" \
-H "X-API-Key: $API_KEY"
# 1. Create source MySQL connection
source_conn=$(curl -X POST "$API_URL/connections" \
-H "X-API-Key: $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-source",
"type": "mysql",
"host": "localhost",
"port": 3306,
"username": "root",
"password": "your_password",
"database": "source_db"
}')
# Extract source connection ID
source_id=$(echo $source_conn | jq -r '.id')
echo "Source connection ID: $source_id" # conn_...
# 2. Create target PostgreSQL connection
target_conn=$(curl -X POST "$API_URL/connections" \
-H "X-API-Key: $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-target",
"type": "postgresql",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "your_password",
"database": "target_db"
}')
# Extract target connection ID
target_id=$(echo $target_conn | jq -r '.id')
echo "Target connection ID: $target_id" # conn_...
# 3. Test both connections
curl -X POST "$API_URL/connections/$source_id/ping" \
-H "X-API-Key: $API_KEY"
curl -X POST "$API_URL/connections/$target_id/ping" \
-H "X-API-Key: $API_KEY"
# 4. Create a stream configuration (template)
stream_config=$(curl -X POST "$API_URL/stream-configs" \
-H "X-API-Key: $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "mysql_to_postgresql",
"source": "'"$source_id"'",
"target": "'"$target_id"'",
"mode": "cdc",
"dataBundleSize": 40,
"reportingIntervals": {
"source": 3,
"target": 3
},
"tables": [
{
"name": "users",
"operations": ["insert", "update", "delete"]
}
],
"createStructure": true
}')
# Extract configuration ID
config_id=$(echo $stream_config | jq -r '.id')
echo "Stream configuration ID: $config_id" # config_...
# 5. Create multiple streams from the same configuration
# Start first stream
stream1_start=$(curl -X POST "$API_URL/streams/$config_id/start" \
-H "X-API-Key: $API_KEY")
# Extract first stream ID
stream1_id=$(echo $stream1_start | jq -r '.id')
echo "First stream ID: $stream1_id" # stream_...
# Start second stream from the same configuration
stream2_start=$(curl -X POST "$API_URL/streams/$config_id/start" \
-H "X-API-Key: $API_KEY")
# Extract second stream ID
stream2_id=$(echo $stream2_start | jq -r '.id')
echo "Second stream ID: $stream2_id" # stream_...
# 6. Manage streams independently
# Get statistics for first stream
curl -X GET "$API_URL/streams/$stream1_id/stats" \
-H "X-API-Key: $API_KEY"
# Pause first stream
curl -X POST "$API_URL/streams/$stream1_id/pause" \
-H "X-API-Key: $API_KEY"
# Get statistics for second stream
curl -X GET "$API_URL/streams/$stream2_id/stats" \
-H "X-API-Key: $API_KEY"
# Stop second stream
curl -X POST "$API_URL/streams/$stream2_id/stop" \
-H "X-API-Key: $API_KEY"
# The configuration remains available for creating more streams
echo "Configuration $config_id can be used to create more streams"
Using Python
Here's a complete Python example that handles both connections and streams:
import requests
from typing import Optional, Dict, List, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ConnectionConfig:
name: str
type: str
host: str
port: int
username: str
password: str
database: str
schema: Optional[str] = None
@dataclass
class StreamConfig:
name: str
source: str # Connection ID
target: str # Connection ID
mode: str
tables: List[Dict[str, Any]]
data_bundle_size: int = 40
reporting_intervals: Dict[str, int] = None
create_structure: bool = True
class DBConvertAPI:
"""Combined client for DBConvert Streams API."""
def __init__(self, api_key: str, base_url: str = "http://127.0.0.1:8020/api/v1"):
self.base_url = base_url
self.headers = {
"X-API-Key": api_key,
"Content-Type": "application/json"
}
def load_user_configs(self) -> bool:
"""Load user configurations (REQUIRED before using other endpoints)."""
try:
response = requests.get(
f"{self.base_url}/user/configs",
headers=self.headers
)
response.raise_for_status()
return True
except requests.exceptions.RequestException as e:
print(f"Error loading user configurations: {e}")
return False
def create_connection(self, config: ConnectionConfig) -> Optional[Dict[str, Any]]:
"""Create a new database connection."""
try:
response = requests.post(
f"{self.base_url}/connections",
headers=self.headers,
json=config.__dict__
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error creating connection: {e}")
return None
def test_connection(self, conn_id: str) -> bool:
"""Test if a connection is working."""
try:
response = requests.post(
f"{self.base_url}/connections/{conn_id}/ping",
headers=self.headers
)
response.raise_for_status()
result = response.json()
return result.get('ping') == 'ok'
except requests.exceptions.RequestException as e:
print(f"Error testing connection: {e}")
return False
def create_stream(self, config: StreamConfig) -> Optional[Dict[str, Any]]:
"""Create a new stream configuration."""
try:
response = requests.post(
f"{self.base_url}/stream-configs",
headers=self.headers,
json=config.__dict__
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error creating stream: {e}")
return None
# Usage example showing the complete workflow
def main():
api = DBConvertAPI("your_api_key_here")
# Load user configurations (REQUIRED)
if not api.load_user_configs():
print("Failed to load user configurations")
return
# 1. Create source MySQL connection
source_config = ConnectionConfig(
name="mysql-source",
type="mysql",
host="localhost",
port=3306,
username="root",
password="your_password",
database="source_db"
)
source_conn = api.create_connection(source_config)
if not source_conn:
print("Failed to create source connection")
return
# 2. Create target PostgreSQL connection
target_config = ConnectionConfig(
name="postgres-target",
type="postgresql",
host="localhost",
port=5432,
username="postgres",
password="your_password",
database="target_db"
)
target_conn = api.create_connection(target_config)
if not target_conn:
print("Failed to create target connection")
return
# 3. Test both connections
if not api.test_connection(source_conn['id']):
print("Source connection test failed")
return
if not api.test_connection(target_conn['id']):
print("Target connection test failed")
return
# 4. Create stream using connection IDs
stream_config = StreamConfig(
name="mysql_to_postgresql",
source=source_conn['id'], # Use source connection ID
target=target_conn['id'], # Use target connection ID
mode="cdc",
tables=[{
"name": "users",
"operations": ["insert", "update", "delete"]
}],
data_bundle_size=40,
reporting_intervals={"source": 3, "target": 3},
create_structure=True
)
stream = api.create_stream(stream_config)
if stream:
print(f"Successfully created stream: {stream['id']}")
if __name__ == "__main__":
main()
Using Node.js
A TypeScript-friendly Node.js implementation with async/await:
import axios, { AxiosInstance } from 'axios';
interface StreamConfig {
name: string;
source: string;
target: string;
mode: 'convert' | 'cdc';
dataBundleSize?: number;
reportingIntervals?: {
source: number;
target: number;
};
tables: Array<{
name: string;
operations?: Array<'insert' | 'update' | 'delete'>;
}>;
createStructure?: boolean;
}
interface StreamResponse {
id: string;
name: string;
status: 'READY' | 'RUNNING' | 'PAUSED' | 'STOPPED' | 'FINISHED' | 'FAILED';
created?: number;
started?: number;
paused?: number;
resumed?: number;
stopped?: number;
}
interface StreamStats {
streamID: string;
source: NodeStats;
target: NodeStats;
}
interface NodeStats {
counter: number;
failedCounter: number;
prevDataSize: number;
sumDataSize: number;
reportingInterval: number;
start: string;
duration: string;
avgRate: number;
status: string;
}
class DBConvertStreamsAPI {
private client: AxiosInstance;
constructor(apiKey: string, baseURL: string = 'http://127.0.0.1:8020/api/v1') {
this.client = axios.create({
baseURL,
headers: {
'X-API-Key': apiKey,
'Content-Type': 'application/json'
}
});
}
async loadUserConfigs(): Promise<boolean> {
try {
await this.client.get('/user/configs');
return true;
} catch (error) {
console.error('Error loading user configurations:',
error.response?.data || error.message);
throw error;
}
}
async createStreamConfig(config: StreamConfig): Promise<StreamResponse> {
try {
const { data } = await this.client.post('/stream-configs', config);
return data;
} catch (error) {
console.error('Error creating stream configuration:',
error.response?.data || error.message);
throw error;
}
}
async startStream(streamId: string): Promise<StreamResponse> {
try {
const { data } = await this.client.post(`/streams/${streamId}/start`);
return data;
} catch (error) {
console.error('Error starting stream:',
error.response?.data || error.message);
throw error;
}
}
async getStreamStats(streamId: string): Promise<StreamStats> {
try {
const { data } = await this.client.get(`/streams/${streamId}/stats`);
return data;
} catch (error) {
console.error('Error getting stream statistics:',
error.response?.data || error.message);
throw error;
}
}
async pauseStream(streamId: string): Promise<StreamResponse> {
try {
const { data } = await this.client.post(`/streams/${streamId}/pause`);
return data;
} catch (error) {
console.error('Error pausing stream:',
error.response?.data || error.message);
throw error;
}
}
async resumeStream(streamId: string): Promise<StreamResponse> {
try {
const { data } = await this.client.post(`/streams/${streamId}/resume`);
return data;
} catch (error) {
console.error('Error resuming stream:',
error.response?.data || error.message);
throw error;
}
}
async stopStream(streamId: string): Promise<StreamResponse> {
try {
const { data } = await this.client.post(`/streams/${streamId}/stop`);
return data;
} catch (error) {
console.error('Error stopping stream:',
error.response?.data || error.message);
throw error;
}
}
}
// Usage example
async function main() {
const api = new DBConvertStreamsAPI('your_api_key_here');
try {
// Load user configurations (REQUIRED)
await api.loadUserConfigs();
// Create a stream configuration
const streamConfig: StreamConfig = {
name: 'mysql_to_postgresql',
source: 'conn_source_id',
target: 'conn_target_id',
mode: 'cdc',
dataBundleSize: 40,
reportingIntervals: {
source: 3,
target: 3
},
tables: [
{
name: 'users',
operations: ['insert', 'update', 'delete']
}
],
createStructure: true
};
// Create and start the stream
const newStream = await api.createStreamConfig(streamConfig);
console.log('Created stream:', newStream.id);
// Start the stream
const startResult = await api.startStream(newStream.id);
console.log('Stream started:', startResult.status);
// Monitor stream statistics
const stats = await api.getStreamStats(newStream.id);
console.log('Processed records:', stats.source.counter);
// Pause the stream
const pauseResult = await api.pauseStream(newStream.id);
console.log('Stream paused:', pauseResult.status);
// Resume the stream
const resumeResult = await api.resumeStream(newStream.id);
console.log('Stream resumed:', resumeResult.status);
} catch (error) {
console.error('Operation failed:', error);
}
}
main();
Using PowerShell
A PowerShell class implementation with strong typing:
class DBConvertStreamsAPI {
[string]$BaseURL
[hashtable]$Headers
DBConvertStreamsAPI([string]$ApiKey, [string]$BaseURL = "http://127.0.0.1:8020/api/v1") {
$this.BaseURL = $BaseURL
$this.Headers = @{
"X-API-Key" = $ApiKey
"Content-Type" = "application/json"
}
}
[bool] LoadUserConfigs() {
try {
Invoke-RestMethod `
-Uri "$($this.BaseURL)/user/configs" `
-Headers $this.Headers `
-Method Get
return $true
}
catch {
Write-Error "Error loading user configurations: $_"
return $false
}
}
[object] CreateStreamConfig([hashtable]$ConfigData) {
try {
$body = $ConfigData | ConvertTo-Json -Depth 10
$response = Invoke-RestMethod `
-Uri "$($this.BaseURL)/stream-configs" `
-Headers $this.Headers `
-Method Post `
-Body $body
return $response
}
catch {
Write-Error "Error creating stream configuration: $_"
return $null
}
}
[object] StartStream([string]$StreamId) {
try {
$response = Invoke-RestMethod `
-Uri "$($this.BaseURL)/streams/$StreamId/start" `
-Headers $this.Headers `
-Method Post
return $response
}
catch {
Write-Error "Error starting stream: $_"
return $null
}
}
[object] GetStreamStats([string]$StreamId) {
try {
$response = Invoke-RestMethod `
-Uri "$($this.BaseURL)/streams/$StreamId/stats" `
-Headers $this.Headers `
-Method Get
return $response
}
catch {
Write-Error "Error getting stream statistics: $_"
return $null
}
}
[object] PauseStream([string]$StreamId) {
try {
$response = Invoke-RestMethod `
-Uri "$($this.BaseURL)/streams/$StreamId/pause" `
-Headers $this.Headers `
-Method Post
return $response
}
catch {
Write-Error "Error pausing stream: $_"
return $null
}
}
[object] ResumeStream([string]$StreamId) {
try {
$response = Invoke-RestMethod `
-Uri "$($this.BaseURL)/streams/$StreamId/resume" `
-Headers $this.Headers `
-Method Post
return $response
}
catch {
Write-Error "Error resuming stream: $_"
return $null
}
}
[object] StopStream([string]$StreamId) {
try {
$response = Invoke-RestMethod `
-Uri "$($this.BaseURL)/streams/$StreamId/stop" `
-Headers $this.Headers `
-Method Post
return $response
}
catch {
Write-Error "Error stopping stream: $_"
return $null
}
}
}
# Usage example
$api = [DBConvertStreamsAPI]::new("your_api_key_here")
# Load user configurations (REQUIRED)
if (-not $api.LoadUserConfigs()) {
Write-Error "Failed to load user configurations"
return
}
# Create a stream configuration
$streamConfig = @{
name = "mysql_to_postgresql"
source = "conn_source_id"
target = "conn_target_id"
mode = "cdc"
dataBundleSize = 40
reportingIntervals = @{
source = 3
target = 3
}
tables = @(
@{
name = "users"
operations = @("insert", "update", "delete")
}
)
createStructure = $true
}
# Create and start the stream
$newStream = $api.CreateStreamConfig($streamConfig)
if ($newStream) {
Write-Host "Created stream: $($newStream.id)"
# Start the stream
$startResult = $api.StartStream($newStream.id)
if ($startResult) {
Write-Host "Stream started: $($startResult.status)"
# Monitor stream statistics
$stats = $api.GetStreamStats($newStream.id)
if ($stats) {
Write-Host "Processed records: $($stats.source.counter)"
# Pause the stream
$pauseResult = $api.PauseStream($newStream.id)
if ($pauseResult) {
Write-Host "Stream paused: $($pauseResult.status)"
# Resume the stream
$resumeResult = $api.ResumeStream($newStream.id)
if ($resumeResult) {
Write-Host "Stream resumed: $($resumeResult.status)"
}
}
}
}
}
Error Handling
Each example includes comprehensive error handling:
- All examples catch and handle network errors
- Response status codes are checked
- Error messages are logged with details
- Type safety is enforced where possible
Common error scenarios:
401
: Invalid or missing API key404
: Stream not found409
: Cannot start stream (e.g., already running)503
: Service unavailable
Next Steps
- View the Stream API Reference for detailed endpoint documentation
- Learn about Stream States and Lifecycle
- Explore Stream Configuration Guide