17
17
*/
18
18
package org .apache .hadoop .hdfs .server .federation .router .async ;
19
19
20
- import org .apache .hadoop .conf .Configuration ;
21
- import org .apache .hadoop .fs .FileSystem ;
22
- import org .apache .hadoop .fs .Path ;
23
- import org .apache .hadoop .fs .permission .FsPermission ;
24
20
import org .apache .hadoop .hdfs .protocol .BlockStoragePolicy ;
25
21
import org .apache .hadoop .hdfs .protocol .DatanodeInfo ;
26
22
import org .apache .hadoop .hdfs .protocol .HdfsConstants ;
27
- import org .apache .hadoop .hdfs .server .federation .MiniRouterDFSCluster ;
28
- import org .apache .hadoop .hdfs .server .federation .MockResolver ;
29
- import org .apache .hadoop .hdfs .server .federation .RouterConfigBuilder ;
30
23
import org .apache .hadoop .hdfs .server .federation .resolver .RemoteLocation ;
31
- import org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys ;
32
24
import org .apache .hadoop .hdfs .server .federation .router .RemoteMethod ;
33
25
import org .apache .hadoop .hdfs .server .federation .router .RouterRpcServer ;
34
26
import org .apache .hadoop .hdfs .server .protocol .DatanodeStorageReport ;
35
- import org .apache .hadoop .ipc .CallerContext ;
36
- import org .junit .After ;
37
- import org .junit .AfterClass ;
38
27
import org .junit .Before ;
39
- import org .junit .BeforeClass ;
40
28
import org .junit .Test ;
41
- import org .mockito .Mockito ;
42
29
43
30
import java .io .IOException ;
44
31
import java .util .List ;
45
32
import java .util .Map ;
46
- import java .util .concurrent .TimeUnit ;
47
33
48
- import static org .apache .hadoop .hdfs .server .federation .FederationTestUtils .NAMENODES ;
49
- import static org .apache .hadoop .hdfs .server .federation .MiniRouterDFSCluster .DEFAULT_HEARTBEAT_INTERVAL_MS ;
50
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT ;
51
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT ;
52
34
import static org .apache .hadoop .hdfs .server .federation .router .async .utils .AsyncUtil .syncReturn ;
53
35
import static org .junit .Assert .assertEquals ;
54
36
import static org .junit .Assert .assertNotNull ;
55
- import static org .junit .Assert .assertTrue ;
56
37
57
38
/**
58
39
* Used to test the async functionality of {@link RouterRpcServer}.
59
40
*/
60
- public class TestRouterAsyncRpcServer {
61
- private static Configuration routerConf ;
62
- /** Federated HDFS cluster. */
63
- private static MiniRouterDFSCluster cluster ;
64
- private static String ns0 ;
65
-
66
- /** Random Router for this federated cluster. */
67
- private MiniRouterDFSCluster .RouterContext router ;
68
- private FileSystem routerFs ;
41
+ public class TestRouterAsyncRpcServer extends RouterAsyncProtocolTestBase {
69
42
private RouterRpcServer asyncRouterRpcServer ;
70
43
71
- @ BeforeClass
72
- public static void setUpCluster () throws Exception {
73
- cluster = new MiniRouterDFSCluster (true , 1 , 2 ,
74
- DEFAULT_HEARTBEAT_INTERVAL_MS , 1000 );
75
- cluster .setNumDatanodesPerNameservice (3 );
76
-
77
- cluster .startCluster ();
78
-
79
- // Making one Namenode active per nameservice
80
- if (cluster .isHighAvailability ()) {
81
- for (String ns : cluster .getNameservices ()) {
82
- cluster .switchToActive (ns , NAMENODES [0 ]);
83
- cluster .switchToStandby (ns , NAMENODES [1 ]);
84
- }
85
- }
86
- // Start routers with only an RPC service
87
- routerConf = new RouterConfigBuilder ()
88
- .rpc ()
89
- .build ();
90
-
91
- // Reduce the number of RPC clients threads to overload the Router easy
92
- routerConf .setInt (RBFConfigKeys .DFS_ROUTER_CLIENT_THREADS_SIZE , 1 );
93
- routerConf .setInt (DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT , 1 );
94
- routerConf .setInt (DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT , 1 );
95
- // We decrease the DN cache times to make the test faster
96
- routerConf .setTimeDuration (
97
- RBFConfigKeys .DN_REPORT_CACHE_EXPIRE , 1 , TimeUnit .SECONDS );
98
- cluster .addRouterOverrides (routerConf );
99
- // Start routers with only an RPC service
100
- cluster .startRouters ();
101
-
102
- // Register and verify all NNs with all routers
103
- cluster .registerNamenodes ();
104
- cluster .waitNamenodeRegistration ();
105
- cluster .waitActiveNamespaces ();
106
- ns0 = cluster .getNameservices ().get (0 );
107
- }
108
-
109
- @ AfterClass
110
- public static void shutdownCluster () throws Exception {
111
- if (cluster != null ) {
112
- cluster .shutdown ();
113
- }
114
- }
115
-
116
44
@ Before
117
- public void setUp () throws IOException {
118
- router = cluster .getRandomRouter ();
119
- routerFs = router .getFileSystem ();
120
- RouterRpcServer routerRpcServer = router .getRouterRpcServer ();
121
- routerRpcServer .initAsyncThreadPool ();
122
- RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient (
123
- routerConf , router .getRouter (), routerRpcServer .getNamenodeResolver (),
124
- routerRpcServer .getRPCMonitor (),
125
- routerRpcServer .getRouterStateIdContext ());
126
- asyncRouterRpcServer = Mockito .spy (routerRpcServer );
127
- Mockito .when (asyncRouterRpcServer .getRPCClient ()).thenReturn (asyncRpcClient );
128
-
129
- // Create mock locations
130
- MockResolver resolver = (MockResolver ) router .getRouter ().getSubclusterResolver ();
131
- resolver .addLocation ("/" , ns0 , "/" );
132
- FsPermission permission = new FsPermission ("705" );
133
- routerFs .mkdirs (new Path ("/testdir" ), permission );
134
- }
135
-
136
- @ After
137
- public void tearDown () throws IOException {
138
- // clear client context
139
- CallerContext .setCurrent (null );
140
- boolean delete = routerFs .delete (new Path ("/testdir" ));
141
- assertTrue (delete );
142
- if (routerFs != null ) {
143
- routerFs .close ();
144
- }
45
+ public void setup () throws IOException {
46
+ asyncRouterRpcServer = getRouterAsyncRpcServer ();
145
47
}
146
48
147
49
/**
@@ -165,7 +67,7 @@ public void testGetCreateLocationAsync() throws Exception {
165
67
asyncRouterRpcServer .getCreateLocationAsync ("/testdir" , locations );
166
68
RemoteLocation remoteLocation = syncReturn (RemoteLocation .class );
167
69
assertNotNull (remoteLocation );
168
- assertEquals (ns0 , remoteLocation .getNameserviceId ());
70
+ assertEquals (getNs0 () , remoteLocation .getNameserviceId ());
169
71
}
170
72
171
73
/**
@@ -182,7 +84,7 @@ public void testGetDatanodeReportAsync() throws Exception {
182
84
asyncRouterRpcServer .getDatanodeStorageReportMapAsync (HdfsConstants .DatanodeReportType .ALL );
183
85
Map <String , DatanodeStorageReport []> map = syncReturn (Map .class );
184
86
assertEquals (1 , map .size ());
185
- assertEquals (3 , map .get (ns0 ).length );
87
+ assertEquals (3 , map .get (getNs0 () ).length );
186
88
187
89
DatanodeInfo [] slowDatanodeReport1 =
188
90
asyncRouterRpcServer .getSlowDatanodeReport (true , 0 );
0 commit comments