Skip to content

Commit 3f1dccd

Browse files
fix(batch-add): on batch add persist subblock values (#2819)
* fix(batch-add): on batch add persist subblock values * consolidate merge subblock * consolidate more code
1 parent 468ec2e commit 3f1dccd

File tree

6 files changed

+147
-144
lines changed

6 files changed

+147
-144
lines changed

apps/sim/background/webhook-execution.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ async function executeWebhookJobInternal(
172172
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}
173173

174174
// Merge subblock states (matching workflow-execution pattern)
175-
const mergedStates = mergeSubblockState(blocks, {})
175+
const mergedStates = mergeSubblockState(blocks)
176176

177177
// Create serialized workflow
178178
const serializer = new Serializer()
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import type { BlockState, SubBlockState } from '@/stores/workflows/workflow/types'
2+
3+
export const DEFAULT_SUBBLOCK_TYPE = 'short-input'
4+
5+
/**
6+
* Merges subblock values into the provided subblock structures.
7+
* Falls back to a default subblock shape when a value has no structure.
8+
* @param subBlocks - Existing subblock definitions from the workflow
9+
* @param values - Stored subblock values keyed by subblock id
10+
* @returns Merged subblock structures with updated values
11+
*/
12+
export function mergeSubBlockValues(
13+
subBlocks: Record<string, unknown> | undefined,
14+
values: Record<string, unknown> | undefined
15+
): Record<string, unknown> {
16+
const merged = { ...(subBlocks || {}) } as Record<string, any>
17+
18+
if (!values) return merged
19+
20+
Object.entries(values).forEach(([subBlockId, value]) => {
21+
if (merged[subBlockId] && typeof merged[subBlockId] === 'object') {
22+
merged[subBlockId] = {
23+
...(merged[subBlockId] as Record<string, unknown>),
24+
value,
25+
}
26+
return
27+
}
28+
29+
merged[subBlockId] = {
30+
id: subBlockId,
31+
type: DEFAULT_SUBBLOCK_TYPE,
32+
value,
33+
}
34+
})
35+
36+
return merged
37+
}
38+
39+
/**
40+
* Merges workflow block states with explicit subblock values while maintaining block structure.
41+
* Values that are null or undefined do not override existing subblock values.
42+
* @param blocks - Block configurations from workflow state
43+
* @param subBlockValues - Subblock values keyed by blockId -> subBlockId -> value
44+
* @param blockId - Optional specific block ID to merge (merges all if not provided)
45+
* @returns Merged block states with updated subblocks
46+
*/
47+
export function mergeSubblockStateWithValues(
48+
blocks: Record<string, BlockState>,
49+
subBlockValues: Record<string, Record<string, unknown>> = {},
50+
blockId?: string
51+
): Record<string, BlockState> {
52+
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
53+
54+
return Object.entries(blocksToProcess).reduce(
55+
(acc, [id, block]) => {
56+
if (!block) {
57+
return acc
58+
}
59+
60+
const blockSubBlocks = block.subBlocks || {}
61+
const blockValues = subBlockValues[id] || {}
62+
const filteredValues = Object.fromEntries(
63+
Object.entries(blockValues).filter(([, value]) => value !== null && value !== undefined)
64+
)
65+
66+
const mergedSubBlocks = mergeSubBlockValues(blockSubBlocks, filteredValues) as Record<
67+
string,
68+
SubBlockState
69+
>
70+
71+
acc[id] = {
72+
...block,
73+
subBlocks: mergedSubBlocks,
74+
}
75+
76+
return acc
77+
},
78+
{} as Record<string, BlockState>
79+
)
80+
}

apps/sim/socket/database/operations.ts

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import postgres from 'postgres'
77
import { env } from '@/lib/core/config/env'
88
import { cleanupExternalWebhook } from '@/lib/webhooks/provider-subscriptions'
99
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
10+
import { mergeSubBlockValues } from '@/lib/workflows/subblocks'
1011
import {
1112
BLOCK_OPERATIONS,
1213
BLOCKS_OPERATIONS,
@@ -455,7 +456,7 @@ async function handleBlocksOperationTx(
455456
}
456457

457458
case BLOCKS_OPERATIONS.BATCH_ADD_BLOCKS: {
458-
const { blocks, edges, loops, parallels } = payload
459+
const { blocks, edges, loops, parallels, subBlockValues } = payload
459460

460461
logger.info(`Batch adding blocks to workflow ${workflowId}`, {
461462
blockCount: blocks?.length || 0,
@@ -465,22 +466,30 @@ async function handleBlocksOperationTx(
465466
})
466467

467468
if (blocks && blocks.length > 0) {
468-
const blockValues = blocks.map((block: Record<string, unknown>) => ({
469-
id: block.id as string,
470-
workflowId,
471-
type: block.type as string,
472-
name: block.name as string,
473-
positionX: (block.position as { x: number; y: number }).x,
474-
positionY: (block.position as { x: number; y: number }).y,
475-
data: (block.data as Record<string, unknown>) || {},
476-
subBlocks: (block.subBlocks as Record<string, unknown>) || {},
477-
outputs: (block.outputs as Record<string, unknown>) || {},
478-
enabled: (block.enabled as boolean) ?? true,
479-
horizontalHandles: (block.horizontalHandles as boolean) ?? true,
480-
advancedMode: (block.advancedMode as boolean) ?? false,
481-
triggerMode: (block.triggerMode as boolean) ?? false,
482-
height: (block.height as number) || 0,
483-
}))
469+
const blockValues = blocks.map((block: Record<string, unknown>) => {
470+
const blockId = block.id as string
471+
const mergedSubBlocks = mergeSubBlockValues(
472+
block.subBlocks as Record<string, unknown>,
473+
subBlockValues?.[blockId]
474+
)
475+
476+
return {
477+
id: blockId,
478+
workflowId,
479+
type: block.type as string,
480+
name: block.name as string,
481+
positionX: (block.position as { x: number; y: number }).x,
482+
positionY: (block.position as { x: number; y: number }).y,
483+
data: (block.data as Record<string, unknown>) || {},
484+
subBlocks: mergedSubBlocks,
485+
outputs: (block.outputs as Record<string, unknown>) || {},
486+
enabled: (block.enabled as boolean) ?? true,
487+
horizontalHandles: (block.horizontalHandles as boolean) ?? true,
488+
advancedMode: (block.advancedMode as boolean) ?? false,
489+
triggerMode: (block.triggerMode as boolean) ?? false,
490+
height: (block.height as number) || 0,
491+
}
492+
})
484493

485494
await tx.insert(workflowBlocks).values(blockValues)
486495

apps/sim/stores/workflows/server-utils.ts

Lines changed: 3 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
* or React hooks, making it safe for use in Next.js API routes.
99
*/
1010

11-
import type { BlockState, SubBlockState } from '@/stores/workflows/workflow/types'
11+
import { mergeSubblockStateWithValues } from '@/lib/workflows/subblocks'
12+
import type { BlockState } from '@/stores/workflows/workflow/types'
1213

1314
/**
1415
* Server-safe version of mergeSubblockState for API routes
@@ -26,72 +27,7 @@ export function mergeSubblockState(
2627
subBlockValues: Record<string, Record<string, any>> = {},
2728
blockId?: string
2829
): Record<string, BlockState> {
29-
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
30-
31-
return Object.entries(blocksToProcess).reduce(
32-
(acc, [id, block]) => {
33-
// Skip if block is undefined
34-
if (!block) {
35-
return acc
36-
}
37-
38-
// Initialize subBlocks if not present
39-
const blockSubBlocks = block.subBlocks || {}
40-
41-
// Get stored values for this block
42-
const blockValues = subBlockValues[id] || {}
43-
44-
// Create a deep copy of the block's subBlocks to maintain structure
45-
const mergedSubBlocks = Object.entries(blockSubBlocks).reduce(
46-
(subAcc, [subBlockId, subBlock]) => {
47-
// Skip if subBlock is undefined
48-
if (!subBlock) {
49-
return subAcc
50-
}
51-
52-
// Get the stored value for this subblock
53-
const storedValue = blockValues[subBlockId]
54-
55-
// Create a new subblock object with the same structure but updated value
56-
subAcc[subBlockId] = {
57-
...subBlock,
58-
value: storedValue !== undefined && storedValue !== null ? storedValue : subBlock.value,
59-
}
60-
61-
return subAcc
62-
},
63-
{} as Record<string, SubBlockState>
64-
)
65-
66-
// Return the full block state with updated subBlocks
67-
acc[id] = {
68-
...block,
69-
subBlocks: mergedSubBlocks,
70-
}
71-
72-
// Add any values that exist in the provided values but aren't in the block structure
73-
// This handles cases where block config has been updated but values still exist
74-
Object.entries(blockValues).forEach(([subBlockId, value]) => {
75-
if (!mergedSubBlocks[subBlockId] && value !== null && value !== undefined) {
76-
// Create a minimal subblock structure
77-
mergedSubBlocks[subBlockId] = {
78-
id: subBlockId,
79-
type: 'short-input', // Default type that's safe to use
80-
value: value,
81-
}
82-
}
83-
})
84-
85-
// Update the block with the final merged subBlocks (including orphaned values)
86-
acc[id] = {
87-
...block,
88-
subBlocks: mergedSubBlocks,
89-
}
90-
91-
return acc
92-
},
93-
{} as Record<string, BlockState>
94-
)
30+
return mergeSubblockStateWithValues(blocks, subBlockValues, blockId)
9531
}
9632

9733
/**

apps/sim/stores/workflows/utils.ts

Lines changed: 35 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,7 @@
11
import type { Edge } from 'reactflow'
22
import { v4 as uuidv4 } from 'uuid'
3-
4-
export function filterNewEdges(edgesToAdd: Edge[], currentEdges: Edge[]): Edge[] {
5-
return edgesToAdd.filter((edge) => {
6-
if (edge.source === edge.target) return false
7-
return !currentEdges.some(
8-
(e) =>
9-
e.source === edge.source &&
10-
e.sourceHandle === edge.sourceHandle &&
11-
e.target === edge.target &&
12-
e.targetHandle === edge.targetHandle
13-
)
14-
})
15-
}
16-
173
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
4+
import { mergeSubBlockValues, mergeSubblockStateWithValues } from '@/lib/workflows/subblocks'
185
import { getBlock } from '@/blocks'
196
import { normalizeName } from '@/executor/constants'
207
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
@@ -32,6 +19,19 @@ const WEBHOOK_SUBBLOCK_FIELDS = ['webhookId', 'triggerPath']
3219

3320
export { normalizeName }
3421

22+
export function filterNewEdges(edgesToAdd: Edge[], currentEdges: Edge[]): Edge[] {
23+
return edgesToAdd.filter((edge) => {
24+
if (edge.source === edge.target) return false
25+
return !currentEdges.some(
26+
(e) =>
27+
e.source === edge.source &&
28+
e.sourceHandle === edge.sourceHandle &&
29+
e.target === edge.target &&
30+
e.targetHandle === edge.targetHandle
31+
)
32+
})
33+
}
34+
3535
export interface RegeneratedState {
3636
blocks: Record<string, BlockState>
3737
edges: Edge[]
@@ -201,27 +201,20 @@ export function prepareDuplicateBlockState(options: PrepareDuplicateBlockStateOp
201201
Object.entries(subBlockValues).filter(([key]) => !WEBHOOK_SUBBLOCK_FIELDS.includes(key))
202202
)
203203

204-
const mergedSubBlocks: Record<string, SubBlockState> = sourceBlock.subBlocks
204+
const baseSubBlocks: Record<string, SubBlockState> = sourceBlock.subBlocks
205205
? JSON.parse(JSON.stringify(sourceBlock.subBlocks))
206206
: {}
207207

208208
WEBHOOK_SUBBLOCK_FIELDS.forEach((field) => {
209-
if (field in mergedSubBlocks) {
210-
delete mergedSubBlocks[field]
209+
if (field in baseSubBlocks) {
210+
delete baseSubBlocks[field]
211211
}
212212
})
213213

214-
Object.entries(filteredSubBlockValues).forEach(([subblockId, value]) => {
215-
if (mergedSubBlocks[subblockId]) {
216-
mergedSubBlocks[subblockId].value = value as SubBlockState['value']
217-
} else {
218-
mergedSubBlocks[subblockId] = {
219-
id: subblockId,
220-
type: 'short-input',
221-
value: value as SubBlockState['value'],
222-
}
223-
}
224-
})
214+
const mergedSubBlocks = mergeSubBlockValues(baseSubBlocks, filteredSubBlockValues) as Record<
215+
string,
216+
SubBlockState
217+
>
225218

226219
const block: BlockState = {
227220
id: newId,
@@ -256,11 +249,16 @@ export function mergeSubblockState(
256249
workflowId?: string,
257250
blockId?: string
258251
): Record<string, BlockState> {
259-
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
260252
const subBlockStore = useSubBlockStore.getState()
261253

262254
const workflowSubblockValues = workflowId ? subBlockStore.workflowValues[workflowId] || {} : {}
263255

256+
if (workflowId) {
257+
return mergeSubblockStateWithValues(blocks, workflowSubblockValues, blockId)
258+
}
259+
260+
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
261+
264262
return Object.entries(blocksToProcess).reduce(
265263
(acc, [id, block]) => {
266264
if (!block) {
@@ -339,9 +337,15 @@ export async function mergeSubblockStateAsync(
339337
workflowId?: string,
340338
blockId?: string
341339
): Promise<Record<string, BlockState>> {
342-
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
343340
const subBlockStore = useSubBlockStore.getState()
344341

342+
if (workflowId) {
343+
const workflowValues = subBlockStore.workflowValues[workflowId] || {}
344+
return mergeSubblockStateWithValues(blocks, workflowValues, blockId)
345+
}
346+
347+
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
348+
345349
// Process blocks in parallel for better performance
346350
const processedBlockEntries = await Promise.all(
347351
Object.entries(blocksToProcess).map(async ([id, block]) => {
@@ -358,16 +362,7 @@ export async function mergeSubblockStateAsync(
358362
return null
359363
}
360364

361-
let storedValue = null
362-
363-
if (workflowId) {
364-
const workflowValues = subBlockStore.workflowValues[workflowId]
365-
if (workflowValues?.[id]) {
366-
storedValue = workflowValues[id][subBlockId]
367-
}
368-
} else {
369-
storedValue = subBlockStore.getValue(id, subBlockId)
370-
}
365+
const storedValue = subBlockStore.getValue(id, subBlockId)
371366

372367
return [
373368
subBlockId,
@@ -386,23 +381,6 @@ export async function mergeSubblockStateAsync(
386381
subBlockEntries.filter((entry): entry is readonly [string, SubBlockState] => entry !== null)
387382
) as Record<string, SubBlockState>
388383

389-
// Add any values that exist in the store but aren't in the block structure
390-
// This handles cases where block config has been updated but values still exist
391-
// IMPORTANT: This includes runtime subblock IDs like webhookId, triggerPath, etc.
392-
if (workflowId) {
393-
const workflowValues = subBlockStore.workflowValues[workflowId]
394-
const blockValues = workflowValues?.[id] || {}
395-
Object.entries(blockValues).forEach(([subBlockId, value]) => {
396-
if (!mergedSubBlocks[subBlockId] && value !== null && value !== undefined) {
397-
mergedSubBlocks[subBlockId] = {
398-
id: subBlockId,
399-
type: 'short-input',
400-
value: value as SubBlockState['value'],
401-
}
402-
}
403-
})
404-
}
405-
406384
// Return the full block state with updated subBlocks (including orphaned values)
407385
return [
408386
id,

0 commit comments

Comments
 (0)