@@ -29,7 +29,7 @@ import { useDebouncedCallback } from 'use-debounce'
29
29
import { removePrefix } from 'src/utils'
30
30
import { useReplacePlaceholder } from 'src/streaming/builder/hooks/useSqlPlaceholderClick'
31
31
import { projectToProjectWithSchema } from 'src/types/program'
32
- import { useAddConnector } from 'src/streaming/builder/hooks/useAddIoNode'
32
+ import { connectorConnects , useAddConnector } from 'src/streaming/builder/hooks/useAddIoNode'
33
33
import MissingSchemaDialog from 'src/streaming/builder/NoSchemaDialog'
34
34
import useStatusNotification from 'src/components/errors/useStatusNotification'
35
35
@@ -107,7 +107,10 @@ export const PipelineWithProvider = (props: {
107
107
setConfig ( configQuery . data . config )
108
108
setSaveState ( 'isUpToDate' )
109
109
110
- console . log ( configQuery . data . attached_connectors )
110
+ const attachedConnectors = configQuery . data . attached_connectors
111
+ let invalidConnections : AttachedConnector [ ] = [ ]
112
+ let validConnections : AttachedConnector [ ] = attachedConnectors
113
+ console . log ( attachedConnectors )
111
114
112
115
// We don't set saveState here because we don't want to override the
113
116
// saveState (when we get the query result back). Otherwise it will cancel
@@ -123,14 +126,32 @@ export const PipelineWithProvider = (props: {
123
126
}
124
127
125
128
const programWithSchema = projectToProjectWithSchema ( foundProject )
129
+ if ( attachedConnectors ) {
130
+ invalidConnections = attachedConnectors . filter ( attached_connector => {
131
+ return ! connectorConnects ( attached_connector , programWithSchema . schema )
132
+ } )
133
+ validConnections = attachedConnectors . filter ( attached_connector => {
134
+ return connectorConnects ( attached_connector , programWithSchema . schema )
135
+ } )
136
+ }
137
+
126
138
setProject ( programWithSchema )
127
139
replacePlaceholder ( programWithSchema )
128
140
}
129
141
}
130
142
131
- if ( configQuery . data . attached_connectors ) {
132
- const attachedConnectors = configQuery . data . attached_connectors
133
- attachedConnectors . forEach ( attached_connector => {
143
+ if ( invalidConnections . length > 0 ) {
144
+ pushMessage ( {
145
+ key : new Date ( ) . getTime ( ) ,
146
+ color : 'warning' ,
147
+ message : `Could not attach ${
148
+ invalidConnections . length
149
+ } connector(s): No tables/views named ${ invalidConnections . map ( c => c . config ) . join ( ', ' ) } found.`
150
+ } )
151
+ }
152
+
153
+ if ( validConnections ) {
154
+ validConnections . forEach ( attached_connector => {
134
155
const connector = connectorQuery . data . find (
135
156
connector => connector . connector_id === attached_connector . connector_id
136
157
)
@@ -165,7 +186,8 @@ export const PipelineWithProvider = (props: {
165
186
setProject ,
166
187
replacePlaceholder ,
167
188
addConnector ,
168
- configId
189
+ configId ,
190
+ pushMessage
169
191
] )
170
192
171
193
const debouncedSave = useDebouncedCallback ( ( ) => {
@@ -205,8 +227,6 @@ export const PipelineWithProvider = (props: {
205
227
}
206
228
)
207
229
} else {
208
- console . log ( 'update existing config' )
209
-
210
230
// Update an existing config
211
231
const connectors : Array < AttachedConnector > = getEdges ( ) . map ( edge => {
212
232
const source = getNode ( edge . source )
0 commit comments