about summary refs log tree commit diff
path: root/third_party/bazel/rules_haskell/debug/linking_utils/ldd.py
blob: 897cfdc713d3015df1ecfc70027cf32545ff7c69 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import subprocess
import os
import sys
import re


### helper functions

def list_to_dict(f, l):
    """dict with elements of list as keys & as values transformed by f"""
    d = {}
    for el in l:
        d[el] = f(el)
    return d

def dict_remove_empty(d):
    """remove keys that have [] or {} or as values"""
    new = {}
    for k, v in d.items():
        if not (v == [] or v == {}):
             new[k] = v
    return new

def identity(x):
    """identity function"""
    return x

def const(x):
    """(curried) constant function"""
    def f(y):
        return x
    return f

def memoized(cache, f, arg):
    """Memoizes a call to `f` with `arg` in the dict `cache`.
    Modifies the cache dict in place."""
    res = cache.get(arg)
    if arg in cache:
        return cache[arg]
    else:
        res = f(arg)
        cache[arg] = res
        return res

### IO functions that find elf dependencies

_field_matcher = re.compile(b"  ([A-Z0-9_]+) +(.*)$")

def read_dynamic_fields(elf_path):
    """Read the dynamic header fields from an elf binary

    Args:
      elf_path: path to the elf binary (either absolute or relative to pwd)

    Returns:
      a list [(field_key, field_value)] where field_keys could appear multiple
      times (for example there's usually more than one NEEDED field).
    """
    res = subprocess.check_output([
        # force locale to C for stable output
        "env", "LC_ALL=C",
        "objdump",
        # specifying the section brings execution time down from 150ms to 10ms
        "--section=.dynamic",
        "--all-headers",
        elf_path
    ])
    to_end = res.split(b"Dynamic Section:\n")[1]
    # to first empty line
    dyn_section = to_end[: 1 + to_end.find(b"\n\n")]
    def read_dynamic_field(s):
        """return (field_key, field_value)"""
        return _field_matcher.match(s).groups()
    return list(map(read_dynamic_field, dyn_section.splitlines(True)))

def __query_dynamic_fields(df, key):
    """takes a list of dynamic field tuples (key and value),
    where keys can appear multiple times, and returns a list of all
    values with the given key (in stable order)."""
    return [v for k, v in df if k == key]

def parse_runpath_dirs(elf_path, elf_dynamic_fields):
    """Parse a RUNPATH entry from an elf header bytestring.

    Returns:
      { path: unmodified string from DT_RUNPATH
      , absolute_path: fully normalized, absolute path to dir }
    """
    fields = __query_dynamic_fields(elf_dynamic_fields, b"RUNPATH")
    if fields == []:
        return []
    assert len(fields) == 1
    val = fields[0]
    origin = os.path.dirname(elf_path)
    return [{ 'path': path,
              'absolute_path': os.path.abspath(path.replace("$ORIGIN", origin)) }
            for path in val.decode().strip(":").split(":")
            if path != ""]

def parse_needed(elf_dynamic_fields):
    """Returns the list of DT_NEEDED entries for elf"""
    return [n.decode() for n in __query_dynamic_fields(elf_dynamic_fields, b"NEEDED")]


### Main utility

# cannot find dependency
LDD_MISSING = "MISSING"
# don't know how to search for dependency
LDD_UNKNOWN = "DUNNO"
# list of all errors for easy branching
LDD_ERRORS = [ LDD_MISSING, LDD_UNKNOWN ]

def _ldd(elf_cache, f, elf_path):
    """Same as `ldd` (below), except for an additional `elf_cache` argument,
    which is a dict needed for memoizing elf files that were already read.
    This is done because the elf reading operation is quite expensive
    and many files are referenced multiple times (e.g. glib.so)."""

    def search(rdirs, elf_libname):
        """search for elf_libname in runfile dirs
        and return either the name or missing"""
        res = LDD_MISSING
        for rdir in rdirs:
            potential_path = os.path.join(rdir['absolute_path'], elf_libname)
            if os.path.exists(potential_path):
                res = {
                    'item': potential_path,
                    'found_in': rdir,
                }
                break
        return res

    def recurse(search_res):
        """Unfold the subtree of ELF dependencies for a `search` result"""
        if search_res == LDD_MISSING:
            return LDD_MISSING
        else:
            # we keep all other fields in search_res the same,
            # just item is the one that does the recursion.
            # This is the part that would normally be done by fmap.
            search_res['item'] = _ldd(elf_cache, f, search_res['item'])
            return search_res

    # (GNU) ld.so resolves any symlinks before searching for dependencies
    elf_realpath = os.path.realpath(elf_path)

    # memoized uses the cache to not repeat the I/O action
    # for the same elf files (same path)
    dyn_fields = memoized(
        elf_cache, read_dynamic_fields, elf_realpath
    )
    rdirs = parse_runpath_dirs(elf_realpath, dyn_fields)
    all_needed = parse_needed(dyn_fields)

    # if there's no runpath dirs we don't know where to search
    if rdirs == []:
        needed = list_to_dict(const(LDD_UNKNOWN), all_needed)
    else:
        needed = list_to_dict(
            lambda name: recurse(search(rdirs, name)),
            all_needed
        )

    result = {
        'runpath_dirs': rdirs,
        'needed': needed
    }
    # Here, f is applied to the result of the previous level of recursion
    return f(result)


def ldd(f, elf_path):
    """follows DT_NEEDED ELF headers for elf by searching the through DT_RUNPATH.

    DependencyInfo :
    { needed : dict(string, union(
        LDD_MISSING, LDD_UNKNOWN,
        {
            # the needed dependency
            item : a,
            # where the dependency was found in
            found_in : RunpathDir
        }))
    # all runpath directories that were searched
    , runpath_dirs : [ RunpathDir ] }

    Args:
        f: DependencyInfo -> a
        modifies the results of each level
        elf_path: path to ELF file, either absolute or relative to current working dir

    Returns: a
    """
    elf_cache = {}
    return _ldd(elf_cache, f, elf_path)


### Functions to pass to ldd

# Only use the current layer

def remove_matching_needed(d, re_matcher_absolute_path=None, re_matcher_path=None):
    """Destructively removes needed values from d['needed']
    if they match the given regex matcher.
    Doesn't remove LDD_ERRORS."""
    def pred(v):
        """return true if match"""
        if v in LDD_ERRORS:
            return False
        found_in = v['found_in']
        abs_match = re_matcher_absolute_path.match(found_in['absolute_path']) \
                    if re_matcher_absolute_path else False
        match = re_matcher_path.match(found_in['path']) \
                    if re_matcher_path else False
        if abs_match or match:
            return True
    d['needed'] = {
        k: v for k, v in d['needed'].items()
        if not pred(v)
    }

def remove_matching_runpaths(d, re_matcher):
    """Destructively removes runpaths from d['runpath_dirs']
    if they match the given regex matcher."""
    d['runpath_dirs'] = [
        runp for runp in d['runpath_dirs']
        if not re_matcher.match(runp['absolute_path'])
    ]
    return d

def non_existing_runpaths(d):
    """Return a list of runpaths_dirs that do not exist in the file system."""
    return [
        runp for runp in d['runpath_dirs']
        if not os.path.exists(runp['absolute_path'])
    ]

def unused_runpaths(d):
    """Return a list of runpath_dirs that were not used to find NEEDED dependencies."""
    used = set()
    for k, v in d['needed'].items():
        if not v in LDD_ERRORS:
            used.add(v['found_in']['absolute_path'])
    return [
        u for u in d['runpath_dirs']
        if u['absolute_path'] not in used
    ]

# Also use the results of sub-layers

def collect_unused_runpaths(d):
    """This is like `unused_runpaths`, but it creates a deduplicated list of all unused runpaths
    for its dependencies instead of just returning them for the current layer.

    Returns:
      a dict of two fields;
      `mine` contains the unused dependencies of the current binary under scrutiny
      `others` contains a flat dict of all .sos with unused runpath entries and a list of them for each .so
    """
    used = set()
    given = set(r['absolute_path'] for r in d['runpath_dirs'])
    prev = {}
    # TODO: use `unused_runpaths` here
    for k, v in d['needed'].items():
        if not v in LDD_ERRORS:
            used.add(v['found_in']['absolute_path'])
            prev[k] = v['item']
    unused = [
        u for u in given.difference(used)
        # leave out nix storepaths
        if not u.startswith("/nix/store")
    ]

    # Each layer doesn't know about their own name
    # So we return a list of unused for this layer ('mine')
    # and a dict of all previeous layers combined (name to list)
    def combine_unused(deps):
        res = {}
        for name, dep in deps.items():
            res.update(dep['others'])
            res[name] = dep['mine']
        return res

    return {
        'mine': unused,
        'others': combine_unused(prev),
    }