30
30
import io .cdap .delta .plugin .mock .BlockingEventEmitter ;
31
31
import io .cdap .delta .plugin .mock .MockContext ;
32
32
import io .cdap .delta .plugin .mock .MockEventEmitter ;
33
+ import org .junit .AfterClass ;
33
34
import org .junit .Assert ;
34
35
import org .junit .BeforeClass ;
35
36
import org .junit .Test ;
41
42
import java .sql .Date ;
42
43
import java .sql .DriverManager ;
43
44
import java .sql .PreparedStatement ;
45
+ import java .sql .SQLException ;
44
46
import java .sql .Statement ;
45
47
import java .time .LocalDate ;
46
48
import java .util .Collections ;
57
59
* are some classloading issues due to copied debezium classes.
58
60
*/
59
61
public class MySqlEventReaderIntegrationTest {
60
- private static final String DB = "test" ;
62
+ private static final int CONSUMER_ID = 13 ;
61
63
private static final String CUSTOMERS_TABLE = "customers" ;
62
64
private static final Schema CUSTOMERS_SCHEMA = Schema .recordOf (
63
65
"customers" ,
@@ -69,9 +71,14 @@ public class MySqlEventReaderIntegrationTest {
69
71
BINARYCOL_TABLE ,
70
72
Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
71
73
Schema .Field .of ("bincol" , Schema .of (Schema .Type .STRING )));
74
+ private static final String HOST = "localhost" ;
75
+ private static final String DB = "test" ;
76
+ private static final String USER = "root" ;
72
77
73
78
private static String password ;
74
79
private static int port ;
80
+ private static Properties connProperties ;
81
+ private static String connectionUrl ;
75
82
76
83
77
84
@ BeforeClass
@@ -84,10 +91,10 @@ public static void setupClass() throws Exception {
84
91
}
85
92
port = Integer .parseInt (properties .getProperty ("mysql.port" ));
86
93
87
- Properties connProperties = new Properties ();
88
- connProperties .put ("user" , "root" );
94
+ connProperties = new Properties ();
95
+ connProperties .put ("user" , USER );
89
96
connProperties .put ("password" , password );
90
- String connectionUrl = String .format ("jdbc:mysql://localhost:%d" , port );
97
+ connectionUrl = String .format ("jdbc:mysql://localhost:%d" , port );
91
98
DriverManager .getDriver (connectionUrl );
92
99
93
100
// wait until a connection can be established
@@ -146,14 +153,24 @@ public static void setupClass() throws Exception {
146
153
}
147
154
}
148
155
156
+ @ AfterClass
157
+ public static void tearDown () throws SQLException {
158
+ // drop database
159
+ try (Connection connection = DriverManager .getConnection (connectionUrl , connProperties )) {
160
+ try (Statement statement = connection .createStatement ()) {
161
+ statement .execute ("DROP DATABASE " + DB );
162
+ }
163
+ }
164
+ }
165
+
149
166
@ Test
150
167
public void test () throws InterruptedException {
151
168
SourceTable sourceTable = new SourceTable (DB , CUSTOMERS_TABLE , null ,
152
169
Collections .emptySet (), Collections .emptySet (), Collections .emptySet ());
153
170
154
171
DeltaSourceContext context = new MockContext (Driver .class );
155
- MockEventEmitter eventEmitter = new MockEventEmitter (6 );
156
- MySqlConfig config = new MySqlConfig ("localhost" , port , "root" , password , 13 , DB ,
172
+ MockEventEmitter eventEmitter = new MockEventEmitter (7 );
173
+ MySqlConfig config = new MySqlConfig (HOST , port , USER , password , CONSUMER_ID , DB ,
157
174
TimeZone .getDefault ().getID ());
158
175
159
176
MySqlEventReader eventReader = new MySqlEventReader (Collections .singleton (sourceTable ), config ,
@@ -164,7 +181,7 @@ public void test() throws InterruptedException {
164
181
eventEmitter .waitForExpectedEvents (30 , TimeUnit .SECONDS );
165
182
166
183
Assert .assertEquals (4 , eventEmitter .getDdlEvents ().size ());
167
- Assert .assertEquals (2 , eventEmitter .getDmlEvents ().size ());
184
+ Assert .assertEquals (3 , eventEmitter .getDmlEvents ().size ());
168
185
169
186
DDLEvent ddlEvent = eventEmitter .getDdlEvents ().get (0 );
170
187
Assert .assertEquals (DDLOperation .Type .DROP_TABLE , ddlEvent .getOperation ().getType ());
@@ -184,14 +201,20 @@ public void test() throws InterruptedException {
184
201
Assert .assertEquals (DB , ddlEvent .getOperation ().getDatabaseName ());
185
202
Assert .assertEquals (CUSTOMERS_TABLE , ddlEvent .getOperation ().getTableName ());
186
203
Assert .assertEquals (Collections .singletonList ("id" ), ddlEvent .getPrimaryKey ());
204
+
187
205
Assert .assertEquals (CUSTOMERS_SCHEMA , ddlEvent .getSchema ());
188
206
189
207
DMLEvent dmlEvent = eventEmitter .getDmlEvents ().get (0 );
190
208
Assert .assertEquals (DMLOperation .Type .INSERT , dmlEvent .getOperation ().getType ());
191
209
Assert .assertEquals (DB , dmlEvent .getOperation ().getDatabaseName ());
192
210
Assert .assertEquals (CUSTOMERS_TABLE , dmlEvent .getOperation ().getTableName ());
193
211
StructuredRecord row = dmlEvent .getRow ();
194
- StructuredRecord expected = StructuredRecord .builder (CUSTOMERS_SCHEMA )
212
+
213
+ // Take schema name from the row as it is generated by debezium
214
+ // so it can be different from the one in our schema but does not impact data
215
+ Schema expectedSchema = Schema .recordOf (row .getSchema ().getRecordName (), CUSTOMERS_SCHEMA .getFields ());
216
+
217
+ StructuredRecord expected = StructuredRecord .builder (expectedSchema )
195
218
.set ("id" , 0 )
196
219
.set ("name" , "alice" )
197
220
.setDate ("bday" , LocalDate .ofEpochDay (0 ))
@@ -203,7 +226,7 @@ public void test() throws InterruptedException {
203
226
Assert .assertEquals (DB , dmlEvent .getOperation ().getDatabaseName ());
204
227
Assert .assertEquals (CUSTOMERS_TABLE , dmlEvent .getOperation ().getTableName ());
205
228
row = dmlEvent .getRow ();
206
- expected = StructuredRecord .builder (CUSTOMERS_SCHEMA )
229
+ expected = StructuredRecord .builder (expectedSchema )
207
230
.set ("id" , 1 )
208
231
.set ("name" , "bob" )
209
232
.setDate ("bday" , LocalDate .ofEpochDay (365 ))
@@ -215,7 +238,7 @@ public void test() throws InterruptedException {
215
238
Assert .assertEquals (DB , dmlEvent .getOperation ().getDatabaseName ());
216
239
Assert .assertEquals (CUSTOMERS_TABLE , dmlEvent .getOperation ().getTableName ());
217
240
row = dmlEvent .getRow ();
218
- expected = StructuredRecord .builder (CUSTOMERS_SCHEMA )
241
+ expected = StructuredRecord .builder (expectedSchema )
219
242
.set ("id" , 2 )
220
243
.set ("name" , "tim" )
221
244
.setDate ("bday" , null )
@@ -232,7 +255,7 @@ public void stopReaderTest() throws Exception {
232
255
BlockingQueue <DDLEvent > ddlEvents = new ArrayBlockingQueue <>(1 );
233
256
BlockingQueue <DMLEvent > dmlEvents = new ArrayBlockingQueue <>(1 );
234
257
EventEmitter eventEmitter = new BlockingEventEmitter (ddlEvents , dmlEvents );
235
- MySqlConfig config = new MySqlConfig ("localhost" , port , "root" , password , 13 , DB ,
258
+ MySqlConfig config = new MySqlConfig (HOST , port , USER , password , CONSUMER_ID , DB ,
236
259
TimeZone .getDefault ().getID ());
237
260
238
261
MySqlEventReader eventReader = new MySqlEventReader (Collections .singleton (sourceTable ), config ,
@@ -264,7 +287,7 @@ public void testBinaryHandlingModebyDebezium() throws InterruptedException {
264
287
context .addRuntimeArgument (MySqlEventReader .SOURCE_CONNECTOR_PREFIX + "binary.handling.mode" , "HEX" );
265
288
266
289
MockEventEmitter eventEmitter = new MockEventEmitter (4 );
267
- MySqlConfig config = new MySqlConfig ("localhost" , port , "root" , password , 13 , DB ,
290
+ MySqlConfig config = new MySqlConfig (HOST , port , USER , password , CONSUMER_ID , DB ,
268
291
TimeZone .getDefault ().getID ());
269
292
270
293
MySqlEventReader eventReader = new MySqlEventReader (Collections .singleton (sourceTable ), config ,
0 commit comments