1
1
import os
2
+ import shutil
3
+ import tarfile
4
+ import time
5
+ import numpy as np
2
6
3
7
from hls4ml .backends import VivadoBackend
4
- from hls4ml .model .flow import get_backend_flows , get_flow , register_flow
8
+ from hls4ml .model .flow import register_flow
9
+ from hls4ml .report import parse_vivado_report
5
10
6
11
class VivadoAcceleratorBackend (VivadoBackend ):
7
12
def __init__ (self ):
8
13
super (VivadoBackend , self ).__init__ (name = 'VivadoAccelerator' )
9
14
self ._register_flows ()
10
15
11
- def make_bitfile (model ):
12
- curr_dir = os .getcwd ()
13
- os .chdir (model .config .get_output_dir ())
14
- try :
15
- os .system ('vivado -mode batch -source design.tcl' )
16
- except :
17
- print ("Something went wrong, check the Vivado logs" )
18
- # These should work but Vivado seems to return before the files are written...
19
- # copyfile('{}_vivado_accelerator/project_1.runs/impl_1/design_1_wrapper.bit'.format(model.config.get_project_name()), './{}.bit'.format(model.config.get_project_name()))
20
- # copyfile('{}_vivado_accelerator/project_1.srcs/sources_1/bd/design_1/hw_handoff/design_1.hwh'.format(model.config.get_project_name()), './{}.hwh'.format(model.config.get_project_name()))
21
- os .chdir (curr_dir )
16
+ @staticmethod
17
+ def package (model , X = None , y = None , sleep_before_retry = 60 ):
18
+ '''Package the hardware build results for HW inference, including test data'''
19
+
20
+ odir = model .config .get_output_dir ()
21
+ name = model .config .get_project_name ()
22
+
23
+ if os .path .isdir (f'{ odir } /package/' ):
24
+ print (f'Found existing package "{ odir } /package/", overwriting' )
25
+ os .makedirs (f'{ odir } /package/' , exist_ok = True )
26
+ if not X is None :
27
+ np .save (f'{ odir } /package/X.npy' , X )
28
+ if not y is None :
29
+ np .save (f'{ odir } /package/y.npy' , y )
30
+
31
+ src = f'{ odir } /{ name } _vivado_accelerator/project_1.runs/impl_1/design_1_wrapper.bit'
32
+ dst = f'{ odir } /package/{ name } .bit'
33
+ _copy_wait_retry (src , dst , sleep = sleep_before_retry )
34
+
35
+ src = f'{ odir } /{ name } _vivado_accelerator/project_1.srcs/sources_1/bd/design_1/hw_handoff/design_1.hwh'
36
+ dst = f'{ odir } /package/{ name } .hwh'
37
+ _copy_wait_retry (src , dst , sleep = sleep_before_retry )
38
+
39
+ driver = model .config .backend .writer .vivado_accelerator_config .get_driver_file ()
40
+ shutil .copy (f'{ odir } /{ driver } ' , f'{ odir } /package/{ driver } ' )
41
+
42
+ _make_tarfile (f'{ odir } /{ name } .tar.gz' , f'{ odir } /package' )
43
+
44
+ def build (self , model , reset = False , csim = True , synth = True , cosim = False , validation = False , export = False , vsynth = False , bitfile = False ):
45
+ # run the VivadoBackend build
46
+ report = super ().build (model , reset = reset , csim = csim , synth = synth , cosim = cosim , validation = validation , export = export , vsynth = vsynth )
47
+ # now make a bitfile
48
+ if bitfile :
49
+ curr_dir = os .getcwd ()
50
+ os .chdir (model .config .get_output_dir ())
51
+ success = False
52
+ try :
53
+ os .system ('vivado -mode batch -source design.tcl' )
54
+ success = True
55
+ except :
56
+ print ("Something went wrong, check the Vivado logs" )
57
+ os .chdir (curr_dir )
58
+ if success :
59
+ VivadoAcceleratorBackend .package (model )
60
+
61
+ return parse_vivado_report (model .config .get_output_dir ())
22
62
23
63
def create_initial_config (self , board = 'pynq-z2' , part = None , clock_period = 5 , io_type = 'io_parallel' , interface = 'axi_stream' ,
24
64
driver = 'python' , input_type = 'float' , output_type = 'float' ):
@@ -56,11 +96,17 @@ def create_initial_config(self, board='pynq-z2', part=None, clock_period=5, io_t
56
96
return config
57
97
58
98
def _register_flows (self ):
59
- #TODO expand this to include new accelerator flow
60
- parent_flows = get_backend_flows (backend = 'vivado' )
61
- for flow_name in parent_flows :
62
- flow = get_flow (flow_name )
63
- acc_flow = register_flow (flow_name .replace ('vivado:' , '' ), flow .optimizers , requires = flow .requires , backend = self .name )
64
- if ':write' in flow_name :
65
- self ._writer_flow = acc_flow
66
- self ._default_flow = 'vivadoaccelerator:ip'
99
+ vivado_writer = ['vivado:write' ]
100
+ vivado_accel_writer = ['vivadoaccelerator:write_hls' ]
101
+ self ._writer_flow = register_flow ('write' , vivado_accel_writer , requires = vivado_writer , backend = self .name )
102
+ self ._default_flow = 'vivado:ip'
103
+
104
+ def _make_tarfile (output_filename , source_dir ):
105
+ with tarfile .open (output_filename , "w:gz" ) as tar :
106
+ tar .add (source_dir , arcname = os .path .basename (source_dir ))
107
+
108
+ def _copy_wait_retry (src , dst , sleep = 60 ):
109
+ if not os .path .isfile (src ):
110
+ print (f'File { src } not found, waiting { sleep } s before retry' )
111
+ time .sleep (sleep )
112
+ shutil .copy (src , dst )
0 commit comments