1
+ /**
2
+ * Licensed to the Apache Software Foundation (ASF) under one
3
+ * or more contributor license agreements. See the NOTICE file
4
+ * distributed with this work for additional information
5
+ * regarding copyright ownership. The ASF licenses this file
6
+ * to you under the Apache License, Version 2.0 (the
7
+ * "License"); you may not use this file except in compliance
8
+ * with the License. You may obtain a copy of the License at
9
+ *
10
+ * http://www.apache.org/licenses/LICENSE-2.0
11
+ *
12
+ * Unless required by applicable law or agreed to in writing, software
13
+ * distributed under the License is distributed on an "AS IS" BASIS,
14
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
+ * See the License for the specific language governing permissions and
16
+ * limitations under the License.
17
+ */
18
+ package org .apache .hadoop .hdfs .server .federation .router ;
19
+
20
+ import org .apache .hadoop .conf .Configuration ;
21
+ import org .apache .hadoop .fs .FSDataOutputStream ;
22
+ import org .apache .hadoop .fs .FileSystem ;
23
+ import org .apache .hadoop .fs .Path ;
24
+ import org .apache .hadoop .fs .permission .FsPermission ;
25
+ import org .apache .hadoop .hdfs .protocol .BlockStoragePolicy ;
26
+ import org .apache .hadoop .hdfs .server .federation .MiniRouterDFSCluster ;
27
+ import org .apache .hadoop .hdfs .server .federation .MockResolver ;
28
+ import org .apache .hadoop .hdfs .server .federation .RouterConfigBuilder ;
29
+ import org .apache .hadoop .ipc .CallerContext ;
30
+ import org .junit .After ;
31
+ import org .junit .AfterClass ;
32
+ import org .junit .Before ;
33
+ import org .junit .BeforeClass ;
34
+ import org .junit .Test ;
35
+ import org .mockito .Mockito ;
36
+
37
+ import java .io .IOException ;
38
+ import java .util .concurrent .TimeUnit ;
39
+
40
+ import static org .apache .hadoop .hdfs .server .federation .FederationTestUtils .NAMENODES ;
41
+ import static org .apache .hadoop .hdfs .server .federation .MiniRouterDFSCluster .DEFAULT_HEARTBEAT_INTERVAL_MS ;
42
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT ;
43
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT ;
44
+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .syncReturn ;
45
+ import static org .junit .Assert .assertArrayEquals ;
46
+ import static org .junit .Assert .assertEquals ;
47
+ import static org .junit .Assert .assertNotEquals ;
48
+ import static org .junit .Assert .assertTrue ;
49
+
50
+ public class TestRouterAsyncStoragePolicy {
51
+ private static Configuration routerConf ;
52
+ /** Federated HDFS cluster. */
53
+ private static MiniRouterDFSCluster cluster ;
54
+ private static String ns0 ;
55
+
56
+ /** Random Router for this federated cluster. */
57
+ private MiniRouterDFSCluster .RouterContext router ;
58
+ private FileSystem routerFs ;
59
+ private RouterRpcServer routerRpcServer ;
60
+ private RouterAsyncStoragePolicy asyncStoragePolicy ;
61
+
62
+ private final String testfilePath = "/testdir/testAsyncStoragePolicy.file" ;
63
+
64
+ @ BeforeClass
65
+ public static void setUpCluster () throws Exception {
66
+ cluster = new MiniRouterDFSCluster (true , 1 , 2 ,
67
+ DEFAULT_HEARTBEAT_INTERVAL_MS , 1000 );
68
+ cluster .setNumDatanodesPerNameservice (3 );
69
+
70
+ cluster .startCluster ();
71
+
72
+ // Making one Namenode active per nameservice
73
+ if (cluster .isHighAvailability ()) {
74
+ for (String ns : cluster .getNameservices ()) {
75
+ cluster .switchToActive (ns , NAMENODES [0 ]);
76
+ cluster .switchToStandby (ns , NAMENODES [1 ]);
77
+ }
78
+ }
79
+ // Start routers with only an RPC service
80
+ routerConf = new RouterConfigBuilder ()
81
+ .rpc ()
82
+ .build ();
83
+
84
+ // Reduce the number of RPC clients threads to overload the Router easy
85
+ routerConf .setInt (RBFConfigKeys .DFS_ROUTER_CLIENT_THREADS_SIZE , 1 );
86
+ routerConf .setInt (DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT , 1 );
87
+ routerConf .setInt (DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT , 1 );
88
+ // We decrease the DN cache times to make the test faster
89
+ routerConf .setTimeDuration (
90
+ RBFConfigKeys .DN_REPORT_CACHE_EXPIRE , 1 , TimeUnit .SECONDS );
91
+ cluster .addRouterOverrides (routerConf );
92
+ // Start routers with only an RPC service
93
+ cluster .startRouters ();
94
+
95
+ // Register and verify all NNs with all routers
96
+ cluster .registerNamenodes ();
97
+ cluster .waitNamenodeRegistration ();
98
+ cluster .waitActiveNamespaces ();
99
+ ns0 = cluster .getNameservices ().get (0 );
100
+ }
101
+
102
+ @ AfterClass
103
+ public static void shutdownCluster () throws Exception {
104
+ if (cluster != null ) {
105
+ cluster .shutdown ();
106
+ }
107
+ }
108
+
109
+ @ Before
110
+ public void setUp () throws IOException {
111
+ router = cluster .getRandomRouter ();
112
+ routerFs = router .getFileSystem ();
113
+ routerRpcServer = router .getRouterRpcServer ();
114
+ routerRpcServer .initAsyncThreadPool ();
115
+ RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient (
116
+ routerConf , router .getRouter (), routerRpcServer .getNamenodeResolver (),
117
+ routerRpcServer .getRPCMonitor (),
118
+ routerRpcServer .getRouterStateIdContext ());
119
+ RouterRpcServer spy = Mockito .spy (routerRpcServer );
120
+ Mockito .when (spy .getRPCClient ()).thenReturn (asyncRpcClient );
121
+ asyncStoragePolicy = new RouterAsyncStoragePolicy (spy );
122
+
123
+ // Create mock locations
124
+ MockResolver resolver = (MockResolver ) router .getRouter ().getSubclusterResolver ();
125
+ resolver .addLocation ("/" , ns0 , "/" );
126
+ FsPermission permission = new FsPermission ("705" );
127
+ routerFs .mkdirs (new Path ("/testdir" ), permission );
128
+ FSDataOutputStream fsDataOutputStream = routerFs .create (
129
+ new Path (testfilePath ), true );
130
+ fsDataOutputStream .write (new byte [1024 ]);
131
+ fsDataOutputStream .close ();
132
+ }
133
+
134
+ @ After
135
+ public void tearDown () throws IOException {
136
+ // clear client context
137
+ CallerContext .setCurrent (null );
138
+ boolean delete = routerFs .delete (new Path ("/testdir" ));
139
+ assertTrue (delete );
140
+ if (routerFs != null ) {
141
+ routerFs .close ();
142
+ }
143
+ }
144
+
145
+ @ Test
146
+ public void testRouterAsyncStoragePolicy () throws Exception {
147
+ BlockStoragePolicy [] storagePolicies = cluster .getNamenodes ().get (0 )
148
+ .getClient ().getStoragePolicies ();
149
+ asyncStoragePolicy .getStoragePolicies ();
150
+ BlockStoragePolicy [] storagePoliciesAsync = syncReturn (BlockStoragePolicy [].class );
151
+ assertArrayEquals (storagePolicies , storagePoliciesAsync );
152
+
153
+ asyncStoragePolicy .getStoragePolicy (testfilePath );
154
+ BlockStoragePolicy blockStoragePolicy1 = syncReturn (BlockStoragePolicy .class );
155
+
156
+ asyncStoragePolicy .setStoragePolicy (testfilePath , "COLD" );
157
+ syncReturn (null );
158
+ asyncStoragePolicy .getStoragePolicy (testfilePath );
159
+ BlockStoragePolicy blockStoragePolicy2 = syncReturn (BlockStoragePolicy .class );
160
+ assertNotEquals (blockStoragePolicy1 , blockStoragePolicy2 );
161
+ assertEquals ("COLD" , blockStoragePolicy2 .getName ());
162
+ }
163
+ }
0 commit comments