487 lines
16 KiB
Python
Executable File
487 lines
16 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
"""rough bindings for libzephyr"""
|
|
|
|
import socket
|
|
import struct
|
|
import ctypes
|
|
import ctypes.util
|
|
import time
|
|
import sys
|
|
from ctypes import c_int, c_uint, c_ushort, c_char, c_ubyte
|
|
from ctypes import c_uint16, c_uint32
|
|
from ctypes import POINTER, c_void_p, c_char_p
|
|
from ctypes import Structure, Union, sizeof
|
|
|
|
__revision__ = "$Id$"
|
|
try:
|
|
__version__ = "%s/%s" % (__revision__.split()[3], __revision__.split()[2])
|
|
except IndexError:
|
|
__version__ = "unknown"
|
|
|
|
def print_line_or_lines(results, indent):
|
|
"""short values on same line, multi-line on later ones..."""
|
|
if len(results) == 1:
|
|
print results[0]
|
|
else:
|
|
print
|
|
for result in results:
|
|
print indent + result
|
|
|
|
def ctypes_pprint(cstruct, indent=""):
|
|
"""pretty print a ctypes Structure or Union"""
|
|
|
|
for field_name, field_ctype in cstruct._fields_:
|
|
field_value = getattr(cstruct, field_name)
|
|
print indent + field_name,
|
|
next_indent = indent + " "
|
|
pprint_name = "pprint_%s" % field_name
|
|
pformat_name = "pformat_%s" % field_name
|
|
if hasattr(cstruct, pprint_name):
|
|
# no longer used
|
|
getattr(cstruct, pprint_name)(next_indent)
|
|
elif hasattr(cstruct, pformat_name):
|
|
# counted-array and other common cases
|
|
print_line_or_lines(getattr(cstruct, pformat_name)(), next_indent)
|
|
elif hasattr(field_value, "pformat"):
|
|
# common for small compound types
|
|
print_line_or_lines(field_value.pformat(), next_indent)
|
|
elif hasattr(field_value, "pprint"):
|
|
# useful for Union selectors
|
|
field_value.pprint(next_indent)
|
|
elif hasattr(field_value, "_fields_"):
|
|
# generic recursion
|
|
print
|
|
ctypes_pprint(field_value, next_indent)
|
|
else:
|
|
# generic simple (or unknown/uninteresting) value
|
|
print field_value
|
|
|
|
class Enum(c_int):
|
|
def pformat(self):
|
|
try:
|
|
return ["%s(%d)" % (self._values_[self.value], self.value)]
|
|
except IndexError:
|
|
return ["unknown enum value(%d)" % (self.value)]
|
|
|
|
def populate_enum(cls):
|
|
"""make members for each of the enum values"""
|
|
for value, tag in enumerate(cls._values_):
|
|
setattr(cls, tag, cls(value))
|
|
|
|
# not really an enum, but we get a richer effect by treating it as one
|
|
class Enum_u16(c_uint16):
|
|
def pformat(self):
|
|
try:
|
|
return ["%s(%d)" % (self._values_[self.value], self.value)]
|
|
except IndexError:
|
|
return ["unknown enum value(%d)" % (self.value)]
|
|
|
|
class Enum_u8(c_ubyte):
|
|
def pformat(self):
|
|
try:
|
|
return ["%s(%d)" % (self._values_[self.value], self.value)]
|
|
except IndexError:
|
|
return ["unknown enum value(%d)" % (self.value)]
|
|
|
|
# POSIX socket types...
|
|
class in_addr(Structure):
|
|
_fields_ = [
|
|
("s_addr", c_uint32),
|
|
]
|
|
def pformat(self):
|
|
return [socket.inet_ntoa(struct.pack("<I", self.s_addr))]
|
|
|
|
class _U_in6_u(Union):
|
|
_fields_ = [
|
|
("u6_addr8", c_ubyte * 16),
|
|
("u6_addr16", c_uint16 * 8),
|
|
("u6_addr32", c_uint32 * 4),
|
|
]
|
|
|
|
class in6_addr(Structure):
|
|
_fields_ = [
|
|
("in6_u", _U_in6_u),
|
|
]
|
|
|
|
# XXX Ick.
|
|
if sys.platform.startswith("darwin") or sys.platform.lower().find("bsd") >= 0:
|
|
sa_has_len = True
|
|
elif sys.platform.startswith("linux"):
|
|
sa_has_len = False
|
|
else:
|
|
# ??
|
|
sa_has_len = False
|
|
|
|
if sa_has_len:
|
|
af_base_type = Enum_u8
|
|
else:
|
|
af_base_type = Enum_u16
|
|
|
|
class AF_(af_base_type):
|
|
_socket_af = dict([(v,n) for n,v in socket.__dict__.items() if n.startswith("AF_")])
|
|
_values_ = [_socket_af.get(k, "unknown address family") for k in range(min(_socket_af), max(_socket_af)+1)]
|
|
|
|
populate_enum(AF_)
|
|
|
|
def maybe_add_len_field(len_name, other_fields):
|
|
if sa_has_len:
|
|
return [(len_name, c_ubyte)] + other_fields
|
|
else:
|
|
return other_fields
|
|
|
|
class sockaddr(Structure):
|
|
_fields_ = maybe_add_len_field("sa_len", [
|
|
("sa_family", AF_),
|
|
("sa_data", c_char * 14),
|
|
])
|
|
|
|
class sockaddr_in(Structure):
|
|
_fields_ = maybe_add_len_field("sin_len", [
|
|
("sin_family", AF_),
|
|
("sin_port", c_uint16),
|
|
("sin_addr", in_addr),
|
|
# hack from linux - do we actually need it?
|
|
("sin_zero", c_ubyte * (sizeof(sockaddr)-sizeof(c_uint16)-sizeof(c_uint16)-sizeof(in_addr))),
|
|
])
|
|
def pformat_sin_zero(self):
|
|
return ["[ignored]"]
|
|
|
|
# RFC2553...
|
|
class sockaddr_in6(Structure):
|
|
_fields_ = maybe_add_len_field("sin6_len", [
|
|
("sin6_family", AF_),
|
|
("sin6_port", c_uint16),
|
|
("sin6_flowinfo", c_uint32),
|
|
("sin6_addr", in6_addr),
|
|
("sin6_scope_id", c_uint32),
|
|
])
|
|
|
|
# zephyr/zephyr.h
|
|
#define Z_MAXOTHERFIELDS 10 /* Max unknown fields in ZNotice_t */
|
|
Z_MAXOTHERFIELDS = 10
|
|
#define ZAUTH (ZMakeAuthentication)
|
|
#define ZCAUTH (ZMakeZcodeAuthentication)
|
|
#define ZNOAUTH ((Z_AuthProc)0)
|
|
ZNOAUTH = 0
|
|
|
|
# typedef enum {
|
|
# UNSAFE, UNACKED, ACKED, HMACK, HMCTL, SERVACK, SERVNAK, CLIENTACK, STAT
|
|
# } ZNotice_Kind_t;
|
|
# extern const char *ZNoticeKinds[9];
|
|
|
|
class ZNotice_Kind_t(Enum):
|
|
_values_ = [
|
|
"UNSAFE", "UNACKED", "ACKED", "HMACK", "HMCTL", "SERVACK", "SERVNAK", "CLIENTACK", "STAT",
|
|
]
|
|
populate_enum(ZNotice_Kind_t)
|
|
|
|
def pformat_timeval(tv_sec, tv_usec):
|
|
"""format timeval parts as seconds and human-readable time"""
|
|
try:
|
|
timestr = time.ctime(tv_sec)
|
|
except ValueError:
|
|
timestr = "invalid unix time"
|
|
if tv_usec >= 1000000 or tv_usec < 0:
|
|
# invalid usec, still treat as numbers
|
|
return ["%dsec, %dusec (bad) (%s)" % (tv_sec, tv_usec, timestr)]
|
|
return ["%d.%06dsec (%s)" % (tv_sec, tv_usec, timestr)]
|
|
|
|
# struct _ZTimeval {
|
|
class _ZTimeval(Structure):
|
|
_fields_ = [
|
|
# int tv_sec;
|
|
("tv_sec", c_int),
|
|
# int tv_usec;
|
|
("tv_usec", c_int),
|
|
# };
|
|
]
|
|
def pformat(self):
|
|
return pformat_timeval(self.tv_sec, self.tv_usec)
|
|
|
|
|
|
class _ZTimeval_Net(_ZTimeval):
|
|
"""When _ZTimeval is used in a ZUnique_Id_t, the time parts are
|
|
stored in network byte order. Handle this by faking up a different type."""
|
|
def pformat(self):
|
|
return pformat_timeval(socket.ntohl(self.tv_sec & 0xffffffff), socket.ntohl(self.tv_usec & 0xffffffff))
|
|
|
|
# typedef struct _ZUnique_Id_t {
|
|
class ZUnique_Id_t(Structure):
|
|
_fields_ = [
|
|
# struct in_addr zuid_addr;
|
|
("zuid_addr", in_addr),
|
|
# struct _ZTimeval tv;
|
|
("tv", _ZTimeval_Net),
|
|
# } ZUnique_Id_t;
|
|
]
|
|
|
|
# union {
|
|
class _U_z_sender_sockaddr(Union):
|
|
_fields_ = [
|
|
# struct sockaddr sa;
|
|
("sa", sockaddr),
|
|
# struct sockaddr_in ip4;
|
|
("ip4", sockaddr_in),
|
|
# struct sockaddr_in6 ip6;
|
|
("ip6", sockaddr_in6),
|
|
# } z_sender_sockaddr;
|
|
]
|
|
def pprint(self, indent):
|
|
print
|
|
if self.sa.sa_family.value == socket.AF_INET:
|
|
ctypes_pprint(self.ip4, indent + ".ip4:")
|
|
elif self.sa.sa_family.value == socket.AF_INET6:
|
|
ctypes_pprint(self.ip6, indent + ".ip6:")
|
|
else:
|
|
ctypes_pprint(self.sa, indent + ".sa:")
|
|
|
|
# typedef struct _ZNotice_t {
|
|
class ZNotice_t(Structure):
|
|
_fields_ = [
|
|
# char *z_packet;
|
|
("z_packet", c_char_p),
|
|
# char *z_version;
|
|
("z_version", c_char_p),
|
|
# ZNotice_Kind_t z_kind;
|
|
("z_kind", ZNotice_Kind_t),
|
|
# ZUnique_Id_t z_uid;
|
|
("z_uid", ZUnique_Id_t),
|
|
# union {
|
|
# struct sockaddr sa;
|
|
# struct sockaddr_in ip4;
|
|
# struct sockaddr_in6 ip6;
|
|
# } z_sender_sockaddr;
|
|
("z_sender_sockaddr", _U_z_sender_sockaddr),
|
|
|
|
# /* heavily deprecated: */
|
|
# #define z_sender_addr z_sender_sockaddr.ip4.sin_addr
|
|
# /* probably a bad idea?: */
|
|
# struct _ZTimeval z_time;
|
|
("z_time", _ZTimeval),
|
|
# unsigned short z_port;
|
|
("z_port", c_ushort),
|
|
# unsigned short z_charset;
|
|
("z_charset", c_ushort),
|
|
# int z_auth;
|
|
("z_auth", c_int),
|
|
# int z_checked_auth;
|
|
# TODO: fake enum, for display
|
|
("z_checked_auth", c_int),
|
|
# int z_authent_len;
|
|
("z_authent_len", c_int),
|
|
# char *z_ascii_authent;
|
|
("z_ascii_authent", c_char_p),
|
|
# char *z_class;
|
|
("z_class", c_char_p),
|
|
# char *z_class_inst;
|
|
("z_class_inst", c_char_p),
|
|
# char *z_opcode;
|
|
("z_opcode", c_char_p),
|
|
# char *z_sender;
|
|
("z_sender", c_char_p),
|
|
# char *z_recipient;
|
|
("z_recipient", c_char_p),
|
|
# char *z_default_format;
|
|
("z_default_format", c_char_p),
|
|
# char *z_multinotice;
|
|
("z_multinotice", c_char_p),
|
|
# ZUnique_Id_t z_multiuid;
|
|
("z_multiuid", ZUnique_Id_t),
|
|
# ZChecksum_t z_checksum;
|
|
("z_checksum", c_uint),
|
|
# char *z_ascii_checksum;
|
|
("z_ascii_checksum", c_char_p),
|
|
# int z_num_other_fields;
|
|
("z_num_other_fields", c_int),
|
|
# char *z_other_fields[Z_MAXOTHERFIELDS];
|
|
("z_other_fields", c_char_p * Z_MAXOTHERFIELDS),
|
|
# caddr_t z_message;
|
|
("z_message", c_char_p), # not 1980
|
|
# int z_message_len;
|
|
("z_message_len", c_int),
|
|
# int z_num_hdr_fields;
|
|
("z_num_hdr_fields", c_int),
|
|
# char **z_hdr_fields;
|
|
("z_hdr_fields", POINTER(c_char_p)),
|
|
# } ZNotice_t;
|
|
]
|
|
def pformat_z_other_fields(self):
|
|
return ["%d: %s" % (n, self.z_other_fields[n])
|
|
for n in range(Z_MAXOTHERFIELDS)]
|
|
def pformat_z_hdr_fields(self):
|
|
if not self.z_hdr_fields:
|
|
return ["NULL"]
|
|
return ["%d: %s" % (n, self.z_hdr_fields[n])
|
|
for n in range(self.z_num_hdr_fields)]
|
|
|
|
class libZephyr(object):
|
|
"""wrappers for functions in libZephyr"""
|
|
testable_funcs = [
|
|
"ZInitialize",
|
|
"ZGetFD",
|
|
"ZGetRealm",
|
|
"ZGetSender",
|
|
"Z_FormatRawHeader",
|
|
"ZParseNotice",
|
|
"ZFormatNotice",
|
|
"ZCompareUID",
|
|
"ZExpandRealm",
|
|
"ZGetCharsetString",
|
|
"ZGetCharset",
|
|
"ZCharsetToString",
|
|
"ZTransliterate",
|
|
"ZOpenPort",
|
|
"ZClosePort",
|
|
"ZMakeAscii",
|
|
"ZMakeZcode",
|
|
"ZGetDestAddr",
|
|
"ZSetFD",
|
|
"ZPending",
|
|
]
|
|
def __init__(self, library_path=None):
|
|
"""connect to the library and build the wrappers"""
|
|
if not library_path:
|
|
library_path = ctypes.util.find_library("zephyr")
|
|
self._lib = ctypes.cdll.LoadLibrary(library_path)
|
|
|
|
# grab the Zauthtype variable
|
|
self.Zauthtype = ctypes.c_int.in_dll(self._lib, 'Zauthtype').value
|
|
|
|
# generic bindings?
|
|
for funcname in self.testable_funcs:
|
|
setattr(self, funcname, getattr(self._lib, funcname))
|
|
|
|
# TODO: fix return types, caller types in a more generic way later
|
|
# (perhaps by parsing the headers or code)
|
|
# perhaps metaprogramming or decorators...
|
|
self.ZGetRealm.restype = ctypes.c_char_p
|
|
self.ZGetSender.restype = ctypes.c_char_p
|
|
|
|
# Code_t
|
|
# Z_FormatRawHeader(ZNotice_t *notice,
|
|
# char *buffer,
|
|
# int buffer_len,
|
|
# int *len,
|
|
# char **cstart,
|
|
# char **cend)
|
|
# This stuffs a notice into a buffer; cstart/cend point into the checksum in buffer
|
|
self.Z_FormatRawHeader.argtypes = [
|
|
c_void_p, # *notice
|
|
c_char_p, # *buffer
|
|
c_int, # buffer_len
|
|
POINTER(c_int), # *len
|
|
POINTER(c_char_p), # **cstart
|
|
POINTER(c_char_p), # **cend
|
|
]
|
|
|
|
# Code_t
|
|
# ZParseNotice(char *buffer,
|
|
# int len,
|
|
# ZNotice_t *notice)
|
|
self.ZParseNotice.argtypes = [
|
|
c_char_p, # *buffer
|
|
c_int, # len
|
|
POINTER(ZNotice_t), # *notice
|
|
]
|
|
|
|
# Code_t
|
|
# ZFormatNotice(register ZNotice_t *notice,
|
|
# char **buffer,
|
|
# int *ret_len,
|
|
# Z_AuthProc cert_routine)
|
|
self.ZFormatNotice.argtypes = [
|
|
POINTER(ZNotice_t), # *notice
|
|
POINTER(c_char_p), # **buffer
|
|
POINTER(c_int), # *ret_len
|
|
c_void_p, # cert_routine
|
|
]
|
|
|
|
# int
|
|
# ZCompareUID(ZUnique_Id_t *uid1,
|
|
# ZUnique_Id_t *uid2)
|
|
self.ZCompareUID.argtypes = [
|
|
POINTER(ZUnique_Id_t), # *uid1
|
|
POINTER(ZUnique_Id_t), # *uid2
|
|
]
|
|
|
|
# char *
|
|
# ZExpandRealm(realm)
|
|
# char *realm; # mmm 80's
|
|
self.ZExpandRealm.restype = c_char_p
|
|
self.ZExpandRealm.argtypes = [
|
|
c_char_p, # realm
|
|
]
|
|
|
|
# unsigned short
|
|
# ZGetCharset(char *charset)
|
|
self.ZGetCharset.restype = c_ushort
|
|
self.ZGetCharset.argtypes = [
|
|
c_char_p, # charset
|
|
]
|
|
|
|
# const char *
|
|
# ZCharsetToString(unsigned short charset)
|
|
self.ZCharsetToString.restype = c_char_p
|
|
self.ZCharsetToString.argtypes = [
|
|
c_ushort, # charset
|
|
]
|
|
|
|
# Code_t
|
|
# ZTransliterate(char *in,
|
|
# int inlen,
|
|
# char *inset,
|
|
# char *outset,
|
|
# char **out,
|
|
# int *outlen)
|
|
self.ZTransliterate.argtypes = [
|
|
c_char_p, # in
|
|
c_int, # inlnet,
|
|
c_char_p, # inset
|
|
c_char_p, # outset
|
|
POINTER(c_char_p), # out
|
|
POINTER(c_int), # outlen
|
|
]
|
|
|
|
# Code_t ZOpenPort(u_short *port)
|
|
self.ZOpenPort.argtypes = [
|
|
POINTER(c_ushort), # port
|
|
]
|
|
|
|
# const char *
|
|
# ZGetCharsetString(char *charset)
|
|
self.ZGetCharsetString.restype = c_char_p
|
|
self.ZGetCharsetString.argtypes = [
|
|
c_char_p, # charset
|
|
]
|
|
|
|
# Code_t
|
|
# ZMakeAscii(register char *ptr,
|
|
# int len,
|
|
# unsigned char *field,
|
|
# int num)
|
|
self.ZMakeAscii.argtypes = [
|
|
c_char_p, # ptr
|
|
c_int, # len
|
|
c_char_p, # field; c_uchar_p?
|
|
c_int, # num
|
|
]
|
|
|
|
# Code_t
|
|
# ZMakeZcode(register char *ptr,
|
|
# int len,
|
|
# unsigned char *field,
|
|
# int num)
|
|
self.ZMakeZcode.argtypes = [
|
|
c_char_p, # ptr
|
|
c_int, # len
|
|
c_char_p, # field; c_uchar_p?
|
|
c_int, # num
|
|
]
|
|
|
|
# struct sockaddr_in ZGetDestAddr (void) {
|
|
self.ZGetDestAddr.restype = sockaddr_in
|
|
|
|
# library-specific setup...
|
|
self.ZInitialize()
|