[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-go] branch master updated: Improved transport and module code.
From: |
gnunet |
Subject: |
[gnunet-go] branch master updated: Improved transport and module code. |
Date: |
Tue, 07 Jun 2022 09:32:34 +0200 |
This is an automated email from the git hooks/post-receive script.
bernd-fix pushed a commit to branch master
in repository gnunet-go.
The following commit(s) were added to refs/heads/master by this push:
new 7a6ae8f Improved transport and module code.
7a6ae8f is described below
commit 7a6ae8f61ee7efde161db98462259ea9bbb23386
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Tue Jun 7 09:31:22 2022 +0200
Improved transport and module code.
---
.../main.go | 8 +-
src/gnunet/cmd/peer_mockup/main.go | 19 +-
src/gnunet/config/config.go | 30 ++-
src/gnunet/config/config_test.go | 19 +-
src/gnunet/config/gnunet-config.json | 119 ++++++------
src/gnunet/core/core.go | 202 ++++++++++++++++----
src/gnunet/core/core_test.go | 203 ++++++++++++++++-----
src/gnunet/core/peer.go | 49 +----
src/gnunet/core/peer_test.go | 23 ++-
src/gnunet/go.mod | 2 +
src/gnunet/go.sum | 2 +
src/gnunet/message/msg_hello.go | 2 +-
src/gnunet/modules.go | 84 ---------
src/gnunet/service/dht/blocks/hello.go | 2 +-
src/gnunet/service/dht/module.go | 18 +-
src/gnunet/service/dht/routingtable.go | 57 ++++--
src/gnunet/service/dht/routingtable_test.go | 8 +-
src/gnunet/service/gns/module.go | 18 ++
src/gnunet/service/module.go | 36 ++++
src/gnunet/service/namecache/module.go | 26 ++-
src/gnunet/service/revocation/module.go | 18 +-
src/gnunet/transport/endpoint.go | 168 ++++++++++++-----
src/gnunet/transport/reader_writer.go | 5 +-
src/gnunet/transport/transport.go | 123 +++++++------
src/gnunet/util/address.go | 21 ++-
src/gnunet/util/database.go | 4 +-
src/gnunet/util/misc.go | 80 ++++++--
src/gnunet/util/peer_id.go | 26 ++-
28 files changed, 907 insertions(+), 465 deletions(-)
diff --git a/src/gnunet/cmd/gnunet-service-dht-test-go/main.go
b/src/gnunet/cmd/gnunet-service-dht-go/main.go
similarity index 94%
rename from src/gnunet/cmd/gnunet-service-dht-test-go/main.go
rename to src/gnunet/cmd/gnunet-service-dht-go/main.go
index cd2bd1a..3b525fd 100644
--- a/src/gnunet/cmd/gnunet-service-dht-test-go/main.go
+++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go
@@ -83,16 +83,12 @@ func main() {
// instantiate core service
ctx, cancel := context.WithCancel(context.Background())
- var local *core.Peer
- if local, err = core.NewLocalPeer(config.Cfg.Local); err != nil {
- logger.Printf(logger.ERROR, "[dht] No local peer: %s\n",
err.Error())
- return
- }
var c *core.Core
- if c, err = core.NewCore(ctx, local); err != nil {
+ if c, err = core.NewCore(ctx, config.Cfg.Local); err != nil {
logger.Printf(logger.ERROR, "[dht] core failed: %s\n",
err.Error())
return
}
+ defer c.Shutdown()
// start a new DHT service
dht := dht.NewService(ctx, c)
diff --git a/src/gnunet/cmd/peer_mockup/main.go
b/src/gnunet/cmd/peer_mockup/main.go
index 58f4baf..96f62da 100644
--- a/src/gnunet/cmd/peer_mockup/main.go
+++ b/src/gnunet/cmd/peer_mockup/main.go
@@ -22,13 +22,19 @@ var (
// configuration for local node
localCfg = &config.NodeConfig{
PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
- Endpoints: []string{
- "udp:127.0.0.1:2086",
+ Endpoints: []*config.EndpointConfig{
+ {
+ ID: "local",
+ Network: "udp",
+ Address: "127.0.0.1",
+ Port: 2086,
+ TTL: 86400,
+ },
},
}
// configuration for remote node
remoteCfg = "3GXXMNb5YpIUO7ejIR2Yy0Cf5texuLfDjHkXcqbPxkc="
- remoteAddr = "udp:172.17.0.5:2086"
+ remoteAddr = "udp://172.17.0.5:2086"
// top-level variables used accross functions
local *core.Peer // local peer (with private key)
@@ -50,14 +56,11 @@ func main() {
flag.Parse()
// setup peer and core instances
- if local, err = core.NewLocalPeer(localCfg); err != nil {
- fmt.Println("local failed: " + err.Error())
- return
- }
- if c, err = core.NewCore(ctx, local); err != nil {
+ if c, err = core.NewCore(ctx, localCfg); err != nil {
fmt.Println("core failed: " + err.Error())
return
}
+ local = c.Peer()
if remote, err = core.NewPeer(remoteCfg); err != nil {
fmt.Println("remote failed: " + err.Error())
return
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go
index 41a65f0..3687a3e 100644
--- a/src/gnunet/config/config.go
+++ b/src/gnunet/config/config.go
@@ -20,6 +20,7 @@ package config
import (
"encoding/json"
+ "fmt"
"io/ioutil"
"reflect"
"regexp"
@@ -32,10 +33,26 @@ import (
// Configuration for local node
//----------------------------------------------------------------------
+// EndpointConfig holds parameters for local network listeners.
+type EndpointConfig struct {
+ ID string `json:"id"` // endpoint identifier
+ Network string `json:"network"` // network protocol to use on endpoint
+ Address string `json:"address"` // address to listen on
+ Port int `json:"port"` // port for listening to network
+ TTL int `json:"ttl"` // time-to-live for address (in seconds)
+}
+
+// Addr returns an address string for endpoint configuration; it does NOT
+// handle special cases like UPNP and such.
+func (c *EndpointConfig) Addr() string {
+ return fmt.Sprintf("%s://%s:%d", c.Network, c.Address, c.Port)
+}
+
// NodeConfig holds parameters for the local node instance
type NodeConfig struct {
- PrivateSeed string `json:"privateSeed"` // Node private key seed
(base64)
- Endpoints []string `json:"endpoints"` // list of endpoints available
+ Name string `json:"name"` // (short) name for
local node
+ PrivateSeed string `json:"privateSeed"` // Node private key
seed (base64)
+ Endpoints []*EndpointConfig `json:"endpoints"` // list of endpoints
available
}
//----------------------------------------------------------------------
@@ -139,9 +156,16 @@ func ParseConfig(fileName string) (err error) {
if err != nil {
return
}
+ return ParseConfigBytes(file, true)
+}
+
+// ParseConfigBytes reads a configuration from binary data. The data is
+// a JSON-encoded content. If 'subst' is true, the configuration strings
+// are subsituted
+func ParseConfigBytes(data []byte, subst bool) (err error) {
// unmarshal to Config data structure
Cfg = new(Config)
- if err = json.Unmarshal(file, Cfg); err == nil {
+ if err = json.Unmarshal(data, Cfg); err == nil {
// process all string-based config settings and apply
// string substitutions.
applySubstitutions(Cfg, Cfg.Env)
diff --git a/src/gnunet/config/config_test.go b/src/gnunet/config/config_test.go
index 3afd3a7..d40bc19 100644
--- a/src/gnunet/config/config_test.go
+++ b/src/gnunet/config/config_test.go
@@ -20,6 +20,7 @@ package config
import (
"encoding/json"
+ "io/ioutil"
"testing"
"github.com/bfix/gospel/logger"
@@ -27,14 +28,18 @@ import (
func TestConfigRead(t *testing.T) {
logger.SetLogLevel(logger.WARN)
- if err := ParseConfig("./gnunet-config.json"); err != nil {
+
+ // read configuration file
+ data, err := ioutil.ReadFile("./gnunet-config.json")
+ if err != nil {
+ t.Fatal(err)
+ }
+ // parse configuration
+ if err := ParseConfigBytes(data, false); err != nil {
t.Fatal(err)
}
- if testing.Verbose() {
- data, err := json.Marshal(Cfg)
- if err != nil {
- t.Fatal(err)
- }
- t.Log("cfg=" + string(data))
+ // write configuration
+ if _, err = json.Marshal(Cfg); err != nil {
+ t.Fatal(err)
}
}
diff --git a/src/gnunet/config/gnunet-config.json
b/src/gnunet/config/gnunet-config.json
index 941cf21..82606d7 100644
--- a/src/gnunet/config/gnunet-config.json
+++ b/src/gnunet/config/gnunet-config.json
@@ -1,58 +1,65 @@
{
- "local": {
- "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
- "endpoints": [
- "r5n+ip+udp:127.0.0.1:6666"
- ]
- },
- "bootstrap": {
- "nodes": [
-
"gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654"
- ]
- },
- "environ": {
- "TMP": "/tmp",
- "RT_SYS": "${TMP}/gnunet-system-runtime"
- },
- "dht": {
- "service": {
- "socket": "${RT_SYS}/gnunet-service-dht.sock",
- "params": {
- "perm": "0770"
- }
- },
- "storage": "dht_file_store+/var/lib/gnunet/dht/store",
- "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000"
- },
- "gns": {
- "service": {
- "socket": "${RT_SYS}/gnunet-service-gns-go.sock",
- "params": {
- "perm": "0770"
- }
- },
- "dhtReplLevel": 10,
- "maxDepth": 250
- },
- "namecache": {
- "service": {
- "socket": "${RT_SYS}/gnunet-service-namecache.sock",
- "params": {
- "perm": "0770"
- }
- },
- "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000"
- },
- "revocation": {
- "service": {
- "socket": "${RT_SYS}/gnunet-service-revocation-go.sock",
- "params": {
- "perm": "0770"
- }
- },
- "storage": "redis:localhost:6397::15"
- },
- "rpc": {
- "endpoint": "tcp:127.0.0.1:80"
- }
+ "bootstrap": {
+ "nodes": [
+
"gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654"
+ ]
+ },
+ "local": {
+ "name": "ygng",
+ "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
+ "endpoints": [
+ {
+ "id": "test",
+ "network": "ip+udp",
+ "address": "upnp:192.168.178.1",
+ "port": 6666,
+ "ttl": 86400
+ }
+ ]
+ },
+ "environ": {
+ "TMP": "/tmp",
+ "RT_SYS": "${TMP}/gnunet-system-runtime"
+ },
+ "dht": {
+ "service": {
+ "socket": "${RT_SYS}/gnunet-service-dht.sock",
+ "params": {
+ "perm": "0770"
+ }
+ },
+ "storage": "dht_file_store+/var/lib/gnunet/dht/store",
+ "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000"
+ },
+ "gns": {
+ "service": {
+ "socket": "${RT_SYS}/gnunet-service-gns-go.sock",
+ "params": {
+ "perm": "0770"
+ }
+ },
+ "dhtReplLevel": 10,
+ "maxDepth": 250
+ },
+ "namecache": {
+ "service": {
+ "socket": "${RT_SYS}/gnunet-service-namecache.sock",
+ "params": {
+ "perm": "0770"
+ }
+ },
+ "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000"
+ },
+ "revocation": {
+ "service": {
+ "socket": "${RT_SYS}/gnunet-service-revocation-go.sock",
+ "params": {
+ "perm": "0770"
+ }
+ },
+ "storage": "redis:localhost:6397::15"
+ },
+ "rpc": {
+ "endpoint": "tcp:127.0.0.1:80"
+ }
}
\ No newline at end of file
diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go
index c3bf355..7582891 100644
--- a/src/gnunet/core/core.go
+++ b/src/gnunet/core/core.go
@@ -20,16 +20,34 @@ package core
import (
"context"
+ "errors"
+ "gnunet/config"
"gnunet/message"
"gnunet/service/dht/blocks"
"gnunet/transport"
"gnunet/util"
"net"
+ "strings"
"time"
+)
- "github.com/bfix/gospel/data"
+//----------------------------------------------------------------------
+// Core-related error codes
+var (
+ ErrCoreNoUpnpDyn = errors.New("no dynamic port with UPnP")
+ ErrCoreNoEndpAddr = errors.New("no endpoint for address")
)
+//----------------------------------------------------------------------
+// EndpointRef is a reference to an endpoint instance managed by core.
+type EndpointRef struct {
+ id string // endpoint identifier in configuration
+ ep transport.Endpoint // reference to endpoint
+ addr *util.Address // public endpoint address
+ upnpId string // UPNP identifier (empty if unused)
+}
+
+//----------------------------------------------------------------------
// Core service
type Core struct {
// local peer instance
@@ -41,31 +59,91 @@ type Core struct {
// reference to transport implementation
trans *transport.Transport
- // registered listeners
+ // registered signal listeners
listeners map[string]*Listener
// list of known peers with addresses
peers *util.PeerAddrList
+
+ // List of registered endpoints
+ endpoints map[string]*EndpointRef
}
//----------------------------------------------------------------------
// NewCore creates and runs a new core instance.
-func NewCore(ctx context.Context, local *Peer) (c *Core, err error) {
+func NewCore(ctx context.Context, node *config.NodeConfig) (c *Core, err
error) {
+
+ // instantiate peer
+ var peer *Peer
+ if peer, err = NewLocalPeer(node); err != nil {
+ return
+ }
// create new core instance
incoming := make(chan *transport.TransportMessage)
c = &Core{
- local: local,
+ local: peer,
incoming: incoming,
listeners: make(map[string]*Listener),
- trans: transport.NewTransport(ctx, incoming),
+ trans: transport.NewTransport(ctx, node.Name, incoming),
peers: util.NewPeerAddrList(),
+ endpoints: make(map[string]*EndpointRef),
}
// add all local peer endpoints to transport.
- for _, addr := range local.addrList {
- if _, err = c.trans.AddEndpoint(ctx, addr); err != nil {
+ for _, epCfg := range node.Endpoints {
+ var (
+ upnpId string // upnp identifier
+ local *util.Address // local address
+ remote *util.Address // remote address
+ ep transport.Endpoint // endpoint reference
+ )
+ // handle special addresses:
+ if strings.HasPrefix(epCfg.Address, "upnp:") {
+ // don't allow dynamic port assignment
+ if epCfg.Port == 0 {
+ err = ErrCoreNoUpnpDyn
+ return
+ }
+ // handle UPNP port forwarding
+ protocol := transport.EpProtocol(epCfg.Network)
+ var localA, remoteA string
+ if upnpId, remoteA, localA, err =
c.trans.ForwardOpen(protocol, epCfg.Address[5:], epCfg.Port); err != nil {
+ return
+ }
+ // parse local and remote addresses
+ if local, err = util.ParseAddress(epCfg.Network + "://"
+ localA); err != nil {
+ return
+ }
+ if remote, err = util.ParseAddress(epCfg.Network +
"://" + remoteA); err != nil {
+ return
+ }
+ } else {
+ // direct address specification:
+ if local, err = util.ParseAddress(epCfg.Addr()); err !=
nil {
+ return
+ }
+ remote = local
+ upnpId = ""
+ }
+ // add endpoint for address
+ if ep, err = c.trans.AddEndpoint(ctx, local); err != nil {
return
}
+ // if port is set to 0, replace it with port assigned
dynamically.
+ // only applies to direct listening addresses!
+ if epCfg.Port == 0 && local == remote {
+ addr := ep.Address()
+ if remote, err = util.ParseAddress(addr.Network() +
"://" + addr.String()); err != nil {
+ return
+ }
+ }
+ // save endpoint reference
+ c.endpoints[epCfg.ID] = &EndpointRef{
+ id: epCfg.ID,
+ ep: ep,
+ addr: remote,
+ upnpId: upnpId,
+ }
}
// run message pump
go func() {
@@ -77,32 +155,31 @@ func NewCore(ctx context.Context, local *Peer) (c *Core,
err error) {
var ev *Event
// inspect message for peer state events
- m, err := tm.Message()
- if err == nil {
- switch msg := m.(type) {
- case *message.HelloMsg:
- // keep peer addresses
- for _, addr := range
msg.Addresses {
- a := &util.Address{
- Netw:
addr.Transport,
- Address:
addr.Address,
- Expires:
addr.ExpireOn,
- }
- c.Learn(ctx,
msg.PeerID, a)
+ switch msg := tm.Msg.(type) {
+ case *message.HelloMsg:
+ // keep peer addresses
+ for _, addr := range msg.Addresses {
+ a := &util.Address{
+ Netw: addr.Transport,
+ Address: addr.Address,
+ Expires: addr.ExpireOn,
}
- // generate EV_CONNECT event
- ev = new(Event)
- ev.ID = EV_CONNECT
- ev.Peer = tm.Peer
- ev.Msg = msg
- c.dispatch(ev)
+ c.Learn(ctx, msg.PeerID, a)
+ }
+ // generate EV_CONNECT event
+ ev = &Event{
+ ID: EV_CONNECT,
+ Peer: tm.Peer,
+ Msg: msg,
}
+ c.dispatch(ev)
}
// generate EV_MESSAGE event
- ev = new(Event)
- ev.ID = EV_MESSAGE
- ev.Peer = tm.Peer
- ev.Msg, _ = tm.Message()
+ ev = &Event{
+ ID: EV_MESSAGE,
+ Peer: tm.Peer,
+ Msg: tm.Msg,
+ }
c.dispatch(ev)
// wait for termination
@@ -114,29 +191,63 @@ func NewCore(ctx context.Context, local *Peer) (c *Core,
err error) {
return
}
+// Shutdown all core-related processes.
+func (c *Core) Shutdown() {
+ c.trans.Shutdown()
+ c.local.Shutdown()
+}
+
//----------------------------------------------------------------------
// Send is a function that allows the local peer to send a protocol
// message to a remote peer.
func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg
message.Message) error {
- // TODO: select best endpoint protocol for transport; now fixed to UDP
- netw := "udp"
- addr := c.peers.Get(peer.String(), netw)
- payload, err := data.Marshal(msg)
- if err != nil {
- return err
+ // TODO: select best endpoint protocol for transport; now fixed to
IP+UDP
+ netw := "ip+udp"
+ addrs := c.peers.Get(peer.String(), netw)
+ if len(addrs) == 0 {
+ return ErrCoreNoEndpAddr
}
- tm := transport.NewTransportMessage(c.PeerID(), payload)
+ // TODO: select best address; curently selects first
+ addr := addrs[0]
+
+ // select best endpoint for transport
+ var ep transport.Endpoint
+ for _, epCfg := range c.endpoints {
+ if epCfg.addr.Network() == netw {
+ if ep == nil {
+ ep = epCfg.ep
+ }
+ // TODO: compare endpoints, select better one:
+ // if ep.Better(epCfg.ep) {
+ // ep = epCfg.ep
+ // }
+ }
+ }
+ // check we have an endpoint to send on
+ if ep == nil {
+ return ErrCoreNoEndpAddr
+ }
+ // assemble transport message
+ tm := transport.NewTransportMessage(c.PeerID(), msg)
+ // send on transport
return c.trans.Send(ctx, addr, tm)
}
// Learn a (new) address for peer
func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr
*util.Address) (err error) {
if c.peers.Add(peer.String(), addr) == 1 {
+ // we added a previously unknown peer: send a HELLO
+
+ // collect endpoint addresses
+ addrList := make([]*util.Address, 0)
+ for _, epRef := range c.endpoints {
+ addrList = append(addrList, epRef.addr)
+ }
// new peer id: send HELLO message to newly added peer
node := c.local
var hello *blocks.HelloBlock
- hello, err = node.HelloData(time.Hour)
+ hello, err = node.HelloData(time.Hour, addrList)
if err != nil {
return
}
@@ -150,11 +261,28 @@ func (c *Core) Learn(ctx context.Context, peer
*util.PeerID, addr *util.Address)
return
}
+// Addresses returns the list of listening endpoint addresses
+func (c *Core) Addresses() (list []*util.Address, err error) {
+ for _, epRef := range c.endpoints {
+ list = append(list, epRef.addr)
+ }
+ return
+}
+
+//----------------------------------------------------------------------
+
+// Peer returns the local peer
+func (c *Core) Peer() *Peer {
+ return c.local
+}
+
// PeerID returns the peer id of the local node.
func (c *Core) PeerID() *util.PeerID {
return c.local.GetID()
}
+//----------------------------------------------------------------------
+
// TryConnect is a function which allows the local peer to attempt the
// establishment of a connection to another peer using an address.
// When the connection attempt is successful, information on the new
diff --git a/src/gnunet/core/core_test.go b/src/gnunet/core/core_test.go
index 102abf1..d8a6277 100644
--- a/src/gnunet/core/core_test.go
+++ b/src/gnunet/core/core_test.go
@@ -20,23 +20,149 @@ package core
import (
"context"
+ "encoding/hex"
"gnunet/config"
"gnunet/util"
"testing"
"time"
)
-var (
- peer1Cfg = &config.NodeConfig{
- PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=",
- Endpoints: []string{"udp://127.0.0.1:20861"},
+//----------------------------------------------------------------------
+// Two node GNUnet (smallest and simplest network)
+//----------------------------------------------------------------------
+
+// TestCoreSimple test a two node network
+func TestCoreSimple(t *testing.T) {
+
+ var (
+ peer1Cfg = &config.NodeConfig{
+ Name: "p1",
+ PrivateSeed:
"iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=",
+ Endpoints: []*config.EndpointConfig{
+ {
+ ID: "p1",
+ Network: "ip+udp",
+ Address: "127.0.0.1",
+ Port: 0,
+ TTL: 86400,
+ },
+ },
+ }
+
+ peer2Cfg = &config.NodeConfig{
+ Name: "p2",
+ PrivateSeed:
"Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=",
+ Endpoints: []*config.EndpointConfig{
+ {
+ ID: "p2",
+ Network: "ip+udp",
+ Address: "127.0.0.1",
+ Port: 20862,
+ TTL: 86400,
+ },
+ },
+ }
+ )
+
+ // setup execution context
+ ctx, cancel := context.WithCancel(context.Background())
+ defer func() {
+ cancel()
+ time.Sleep(time.Second)
+ }()
+
+ // create and run nodes
+ node1, err := NewTestNode(t, ctx, peer1Cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ node2, err := NewTestNode(t, ctx, peer2Cfg)
+ if err != nil {
+ t.Fatal(err)
}
- peer2Cfg = &config.NodeConfig{
- PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=",
- Endpoints: []string{"udp://127.0.0.1:20862"},
+ // learn peer addresses (triggers HELLO)
+ list, err := node2.core.Addresses()
+ if err != nil {
+ t.Fatal(err)
}
-)
+ for _, addr := range list {
+ node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr))
+ }
+
+ // wait for 5 seconds
+ time.Sleep(5 * time.Second)
+}
+
+//----------------------------------------------------------------------
+// Two node GNUnet both running locally, but exchanging messages over
+// the internet (UPNP router required).
+//----------------------------------------------------------------------
+
+// TestCoreSimple test a two node network
+func TestCoreUPNP(t *testing.T) {
+
+ // configuration data
+ var (
+ peer1Cfg = &config.NodeConfig{
+ Name: "p1",
+ PrivateSeed:
"iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=",
+ Endpoints: []*config.EndpointConfig{
+ {
+ ID: "p1",
+ Network: "ip+udp",
+ Address: "upnp:",
+ Port: 2086,
+ TTL: 86400,
+ },
+ },
+ }
+ peer2Cfg = &config.NodeConfig{
+ Name: "p2",
+ PrivateSeed:
"Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=",
+ Endpoints: []*config.EndpointConfig{
+ {
+ ID: "p2",
+ Network: "ip+udp",
+ Address: "upnp:",
+ Port: 1080,
+ TTL: 86400,
+ },
+ },
+ }
+ )
+
+ // setup execution context
+ ctx, cancel := context.WithCancel(context.Background())
+ defer func() {
+ cancel()
+ time.Sleep(time.Second)
+ }()
+
+ // create and run nodes
+ node1, err := NewTestNode(t, ctx, peer1Cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer node1.Shutdown()
+ node2, err := NewTestNode(t, ctx, peer2Cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer node2.Shutdown()
+
+ // learn peer addresses (triggers HELLO)
+ list, err := node2.core.Addresses()
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, addr := range list {
+ node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr))
+ }
+
+ // sleep a bit
+ time.Sleep(3 * time.Second)
+}
//----------------------------------------------------------------------
// create and run a node with given spec
@@ -50,9 +176,15 @@ type TestNode struct {
addr *util.Address
}
+func (n *TestNode) Shutdown() {
+ n.core.Shutdown()
+}
+
func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr
*util.Address) {
n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(),
peer.String())
- n.core.Learn(ctx, peer, addr)
+ if err := n.core.Learn(ctx, peer, addr); err != nil {
+ n.t.Log("Learn: " + err.Error())
+ }
}
func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig)
(node *TestNode, err error) {
@@ -62,18 +194,25 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg
*config.NodeConfig) (nod
node.t = t
node.id = util.NextID()
+ // create core service
+ if node.core, err = NewCore(ctx, cfg); err != nil {
+ return
+ }
+ node.peer = node.core.Peer()
+
// create peer object
if node.peer, err = NewLocalPeer(cfg); err != nil {
return
}
t.Logf("[%d] Node %s starting", node.id, node.peer.GetID())
+ t.Logf("[%d] --> %s", node.id,
hex.EncodeToString(node.peer.GetID().Key))
- // create core service
- if node.core, err = NewCore(ctx, node.peer); err != nil {
- return
+ list, err := node.core.Addresses()
+ if err != nil {
+ t.Fatal(err)
}
- for _, addr := range node.core.trans.Endpoints() {
- s := addr.Network() + ":" + addr.String()
+ for _, addr := range list {
+ s := addr.Network() + "://" + addr.String()
if node.addr, err = util.ParseAddress(s); err != nil {
continue
}
@@ -82,7 +221,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg
*config.NodeConfig) (nod
// register as event listener
incoming := make(chan *Event)
- node.core.Register("test", NewListener(incoming, nil))
+ node.core.Register(cfg.Name, NewListener(incoming, nil))
// heart beat
tick := time.NewTicker(5 * time.Minute)
@@ -100,6 +239,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg
*config.NodeConfig) (nod
t.Logf("[%d] <<< Peer %s diconnected",
node.id, ev.Peer)
case EV_MESSAGE:
t.Logf("[%d] <<< Msg from %s of type
%d", node.id, ev.Peer, ev.Msg.Header().MsgType)
+ t.Logf("[%d] <<< --> %s", node.id,
ev.Msg.String())
}
// handle termination signal
@@ -115,36 +255,3 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg
*config.NodeConfig) (nod
}()
return
}
-
-//----------------------------------------------------------------------
-// Two node GNUnet (smallest and simplest network)
-//----------------------------------------------------------------------
-
-// TestCoreSimple test a two node network
-func TestCoreSimple(t *testing.T) {
-
- // setup execution context
- ctx, cancel := context.WithCancel(context.Background())
- defer func() {
- cancel()
- time.Sleep(time.Second)
- }()
-
- // create and run nodes
- node1, err := NewTestNode(t, ctx, peer1Cfg)
- if err != nil {
- t.Fatal(err)
- }
- node2, err := NewTestNode(t, ctx, peer2Cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- // learn peer addresses (triggers HELLO)
- for _, addr := range node2.core.trans.Endpoints() {
- node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr))
- }
-
- // wait for 5 seconds
- time.Sleep(5 * time.Second)
-}
diff --git a/src/gnunet/core/peer.go b/src/gnunet/core/peer.go
index 2b6fe74..86ed43a 100644
--- a/src/gnunet/core/peer.go
+++ b/src/gnunet/core/peer.go
@@ -48,7 +48,6 @@ type Peer struct {
prv *ed25519.PrivateKey // node private key (long-term
signing key)
pub *ed25519.PublicKey // node public key (=identifier)
idString string // node identifier as string
- addrList []*util.Address // list of addresses associated with
node
ephPrv *ed25519.PrivateKey // ephemeral signing key
ephMsg *message.EphemeralKeyMsg // ephemeral signing key message
}
@@ -73,16 +72,6 @@ func NewLocalPeer(cfg *config.NodeConfig) (p *Peer, err
error) {
if err != nil {
return
}
- // set the endpoint addresses for local node
- p.addrList = make([]*util.Address, len(cfg.Endpoints))
- var addr *util.Address
- for i, a := range cfg.Endpoints {
- if addr, err = util.ParseAddress(a); err != nil {
- return
- }
- addr.Expires = util.NewAbsoluteTime(time.Now().Add(12 *
time.Hour))
- p.addrList[i] = addr
- }
return
}
@@ -98,36 +87,24 @@ func NewPeer(peerID string) (p *Peer, err error) {
p.prv = nil
p.pub = ed25519.NewPublicKeyFromBytes(data)
p.idString = util.EncodeBinaryToString(p.pub.Bytes())
- p.addrList = make([]*util.Address, 0)
return
}
+// Shutdown peer-related processes.
+func (p *Peer) Shutdown() {}
+
//----------------------------------------------------------------------
//----------------------------------------------------------------------
-// Address returns a peer address for the given transport protocol
-func (p *Peer) Address(transport string) *util.Address {
- for _, addr := range p.addrList {
- // skip expired entries
- if addr.Expires.Expired() {
- continue
- }
- // filter by transport protocol
- if len(transport) > 0 && transport != addr.Netw {
- continue
- }
- return addr
- }
- return nil
-}
-
-// HelloData returns the current HELLO data for the peer
-func (p *Peer) HelloData(ttl time.Duration) (h *blocks.HelloBlock, err error) {
+// HelloData returns the current HELLO data for the peer. The list of listening
+// endpoint addresses re passed in from core to reflect the actual active
+// endpoints.
+func (p *Peer) HelloData(ttl time.Duration, a []*util.Address) (h
*blocks.HelloBlock, err error) {
// assemble HELLO data
h = new(blocks.HelloBlock)
h.PeerID = p.GetID()
h.Expire = util.NewAbsoluteTime(time.Now().Add(ttl))
- h.SetAddresses(p.addrList)
+ h.SetAddresses(a)
// sign data
err = h.Sign(p.prv)
@@ -171,16 +148,6 @@ func (p *Peer) GetIDString() string {
return p.idString
}
-// GetAddressList returns a list of addresses associated with this peer.
-func (p *Peer) GetAddressList() []*util.Address {
- return p.addrList
-}
-
-// AddAddress adds a new address for a node.
-func (p *Peer) AddAddress(a *util.Address) {
- p.addrList = append(p.addrList, a)
-}
-
// Sign a message with the (long-term) private key.
func (p *Peer) Sign(msg []byte) (*ed25519.EdSignature, error) {
if p.prv == nil {
diff --git a/src/gnunet/core/peer_test.go b/src/gnunet/core/peer_test.go
index 28b328f..70a5c5f 100644
--- a/src/gnunet/core/peer_test.go
+++ b/src/gnunet/core/peer_test.go
@@ -21,6 +21,7 @@ package core
import (
"gnunet/config"
"gnunet/service/dht/blocks"
+ "gnunet/util"
"testing"
"time"
)
@@ -29,8 +30,13 @@ import (
var (
cfg = &config.NodeConfig{
PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
- Endpoints: []string{
- "r5n+ip+udp://127.0.0.1:6666",
+ Endpoints: []*config.EndpointConfig{
+ {
+ ID: "test",
+ Network: "r5n+ip+udp",
+ Address: "127.0.0.1",
+ Port: 6666,
+ },
},
}
TTL = 6 * time.Hour
@@ -44,8 +50,17 @@ func TestPeerHello(t *testing.T) {
t.Fatal(err)
}
- // get HELLO data for the node
- h, err := node.HelloData(TTL)
+ // get HELLO data for the node:
+ // This hack will only work for direct listening addresses
+ addrList := make([]*util.Address, 0)
+ for _, epRef := range cfg.Endpoints {
+ addr, err := util.ParseAddress(epRef.Addr())
+ if err != nil {
+ t.Fatal(err)
+ }
+ addrList = append(addrList, addr)
+ }
+ h, err := node.HelloData(TTL, addrList)
// convert to URL and back
u := h.URL()
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index ad203ca..eb17b30 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -15,9 +15,11 @@ require (
require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f //
indirect
+ github.com/huin/goupnp v1.0.0 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
+ golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index f2baf8e..17c2d00 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -11,6 +11,7 @@ github.com/go-sql-driver/mysql v1.6.0
h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC
github.com/go-sql-driver/mysql v1.6.0/go.mod
h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod
h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo=
github.com/huin/goupnp v1.0.0/go.mod
h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod
h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
github.com/mattn/go-sqlite3 v1.14.13
h1:1tj15ngiFfcZzii7yd82foL+ks+ouQcj8j/TPq3fk1I=
@@ -55,6 +56,7 @@ golang.org/x/text v0.3.3/go.mod
h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2
h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0=
diff --git a/src/gnunet/message/msg_hello.go b/src/gnunet/message/msg_hello.go
index 18fe9d5..25ef98a 100644
--- a/src/gnunet/message/msg_hello.go
+++ b/src/gnunet/message/msg_hello.go
@@ -59,7 +59,7 @@ func NewHelloAddress(a *util.Address) *HelloAddress {
// String returns a human-readable representation of the message.
func (a *HelloAddress) String() string {
return fmt.Sprintf("Address{%s,expire=%s}",
- util.AddressString(a.Transport, a.Address), a.ExpireOn)
+ util.URI(a.Transport, a.Address), a.ExpireOn)
}
// HelloMsg is a message send by peers to announce their presence
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go
deleted file mode 100644
index e47699f..0000000
--- a/src/gnunet/modules.go
+++ /dev/null
@@ -1,84 +0,0 @@
-// This file is part of gnunet-go, a GNUnet-implementation in Golang.
-// Copyright (C) 2019, 2020 Bernd Fix >Y<
-//
-// gnunet-go is free software: you can redistribute it and/or modify it
-// under the terms of the GNU Affero General Public License as published
-// by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// gnunet-go is distributed in the hope that it will be useful, but
-// WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-// Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-// SPDX-License-Identifier: AGPL3.0-or-later
-
-//======================================================================
-// Standalone (all-in-one) implementation of GNUnet:
-// -------------------------------------------------
-// Instead of running GNUnet services like GNS or DHT in separate
-// processes communicating (exchanging messages) with each other over
-// Unix Domain Sockets, the standalone implementation combines all
-// service modules into a single binary running go-routines to
-// concurrently performing their tasks.
-//======================================================================
-
-package gnunet
-
-import (
- "gnunet/service/dht"
- "gnunet/service/gns"
- "gnunet/service/namecache"
- "gnunet/service/revocation"
- "net/rpc"
-)
-
-// Instances holds a list of all GNUnet service modules
-type Instances struct {
- GNS *gns.Module
- Namecache *namecache.NamecacheModule
- DHT *dht.Module
- Revocation *revocation.Module
-}
-
-// Register modules for JSON-RPC
-func (inst Instances) Register() {
- rpc.Register(inst.GNS)
- rpc.Register(inst.Namecache)
- rpc.Register(inst.DHT)
- rpc.Register(inst.Revocation)
-}
-
-// Local reference to instance list. The list is initialized
-// by core.
-var (
- Modules Instances
-)
-
-/* TODO: implement
-// Initialize instance list and link module functions as required.
-// This function is called by core on start-up.
-func Init(ctx context.Context) {
-
- // Namecache (no calls to other modules)
- Modules.Namecache = namecache.NewModule(ctx, c)
-
- // DHT (no calls to other modules)
- Modules.DHT = dht.NewModule(ctx, c)
-
- // Revocation (no calls to other modules)
- Modules.Revocation = revocation.NewModule(ctx, c)
-
- // GNS (calls Namecache, DHT and Identity)
- gns := gns.NewModule(ctx, c)
- Modules.GNS = gns
- gns.LookupLocal = Modules.Namecache.Get
- gns.StoreLocal = Modules.Namecache.Put
- gns.LookupRemote = Modules.DHT.Get
- gns.RevocationQuery = Modules.Revocation.Query
- gns.RevocationRevoke = Modules.Revocation.Revoke
-}
-*/
diff --git a/src/gnunet/service/dht/blocks/hello.go
b/src/gnunet/service/dht/blocks/hello.go
index 77fc2ae..eb3bf2a 100644
--- a/src/gnunet/service/dht/blocks/hello.go
+++ b/src/gnunet/service/dht/blocks/hello.go
@@ -177,7 +177,7 @@ func (h *HelloBlock) URL() string {
if i > 0 {
u += "&"
}
- u += url.QueryEscape(a.String())
+ u += url.QueryEscape(a.URI())
}
return u
}
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 5f04d54..3339aa2 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -81,7 +81,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module)
{
//----------------------------------------------------------------------
-// Get a block from the DHT
+// Get a block from the DHT ["dht:get"]
func (nc *Module) Get(ctx context.Context, query blocks.Query) (block
blocks.Block, err error) {
// check if we have the requested block in cache or permanent storage.
@@ -100,7 +100,7 @@ func (nc *Module) Get(ctx context.Context, query
blocks.Query) (block blocks.Blo
return nil, nil
}
-// Put a block into the DHT
+// Put a block into the DHT ["dht:put"]
func (nc *Module) Put(ctx context.Context, key blocks.Query, block
blocks.Block) error {
return nil
}
@@ -142,6 +142,20 @@ func (m *Module) heartbeat(ctx context.Context) {
//----------------------------------------------------------------------
+// Export functions
+func (m *Module) Export(fcn map[string]any) {
+ // add exported functions from module
+ fcn["dht:get"] = m.Get
+ fcn["dht:put"] = m.Put
+}
+
+// Import functions
+func (m *Module) Import(fcm map[string]any) {
+ // nothing to import now.
+}
+
+//----------------------------------------------------------------------
+
// RPC returns the route and handler function for a JSON-RPC request
func (m *Module) RPC() (string, func(http.ResponseWriter, *http.Request)) {
return "/gns/", func(wrt http.ResponseWriter, req *http.Request) {
diff --git a/src/gnunet/service/dht/routingtable.go
b/src/gnunet/service/dht/routingtable.go
index 0078b71..2933e6a 100644
--- a/src/gnunet/service/dht/routingtable.go
+++ b/src/gnunet/service/dht/routingtable.go
@@ -101,7 +101,7 @@ type RoutingTable struct {
ref *PeerAddress // reference address for distance
buckets []*Bucket // list of buckets
list map[*PeerAddress]struct{} // keep list of peers
- rwlock sync.RWMutex // lock for write operations
+ mtx sync.RWMutex // lock for write operations
l2nse float64 // log2 of estimated network size
inProcess bool // flag if Process() is running
}
@@ -131,8 +131,8 @@ func NewRoutingTable(ref *PeerAddress) *RoutingTable {
// Returns true if the entry was added, false otherwise.
func (rt *RoutingTable) Add(p *PeerAddress) bool {
// ensure one write and no readers
- rt.rwlock.Lock()
- defer rt.rwlock.Unlock()
+ rt.lock(false)
+ defer rt.unlock(false)
// check if peer is already known
if _, ok := rt.list[p]; ok {
@@ -153,8 +153,8 @@ func (rt *RoutingTable) Add(p *PeerAddress) bool {
// Returns true if the entry was removed, false otherwise.
func (rt *RoutingTable) Remove(p *PeerAddress) bool {
// ensure one write and no readers
- rt.rwlock.Lock()
- defer rt.rwlock.Unlock()
+ rt.lock(false)
+ defer rt.unlock(false)
// compute distance (bucket index) and remove entry from bucket
_, idx := p.Distance(rt.ref)
@@ -170,10 +170,15 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool {
//----------------------------------------------------------------------
// Process a function f in the locked context of a routing table
-func (rt *RoutingTable) Process(f func() error) error {
- // ensure one write and no readers
- rt.rwlock.Lock()
- defer rt.rwlock.Unlock()
+func (rt *RoutingTable) Process(f func() error, readonly bool) error {
+ // handle locking
+ rt.lock(readonly)
+ rt.inProcess = true
+ defer func() {
+ rt.inProcess = false
+ rt.unlock(readonly)
+ }()
+ // call function in unlocked context
return f()
}
@@ -184,8 +189,8 @@ func (rt *RoutingTable) Process(f func() error) error {
// SelectClosestPeer for a given peer address and bloomfilter.
func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter)
(n *PeerAddress) {
// no writer allowed
- rt.rwlock.RLock()
- defer rt.rwlock.RUnlock()
+ rt.mtx.RLock()
+ defer rt.mtx.RUnlock()
// find closest address
var dist *math.Int
@@ -204,8 +209,8 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress,
bf *PeerBloomFilter) (
// included in the bloomfilter)
func (rt *RoutingTable) SelectRandomPeer(bf *PeerBloomFilter) *PeerAddress {
// no writer allowed
- rt.rwlock.RLock()
- defer rt.rwlock.RUnlock()
+ rt.mtx.RLock()
+ defer rt.mtx.RUnlock()
// select random entry from list
if size := len(rt.list); size > 0 {
@@ -279,11 +284,35 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) {
}
}
return nil
- }); err != nil {
+ }, false); err != nil {
logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error())
}
}
+//----------------------------------------------------------------------
+
+// lock with given mode (if not in processing function)
+func (rt *RoutingTable) lock(readonly bool) {
+ if !rt.inProcess {
+ if readonly {
+ rt.mtx.RLock()
+ } else {
+ rt.mtx.Lock()
+ }
+ }
+}
+
+// lock with given mode (if not in processing function)
+func (rt *RoutingTable) unlock(readonly bool) {
+ if !rt.inProcess {
+ if readonly {
+ rt.mtx.RUnlock()
+ } else {
+ rt.mtx.Unlock()
+ }
+ }
+}
+
//======================================================================
// Routing table buckets
//======================================================================
diff --git a/src/gnunet/service/dht/routingtable_test.go
b/src/gnunet/service/dht/routingtable_test.go
index 659f9d4..33c4b7f 100644
--- a/src/gnunet/service/dht/routingtable_test.go
+++ b/src/gnunet/service/dht/routingtable_test.go
@@ -45,8 +45,12 @@ type Entry struct {
var (
cfg = &config.NodeConfig{
PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
- Endpoints: []string{
- "r5n+ip+udp://127.0.0.1:6666",
+ Endpoints: []*config.EndpointConfig{
+ {
+ Network: "r5n+ip+udp",
+ Address: "127.0.0.1",
+ Port: 6666,
+ },
},
}
)
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index 4878aa0..93f9bca 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -98,6 +98,7 @@ type Module struct {
RevocationRevoke func(ctx context.Context, rd *revocation.RevData)
(success bool, err error)
}
+// NewModule instantiates a new GNS module.
func NewModule(ctx context.Context, c *core.Core) (m *Module) {
m = &Module{
ModuleImpl: *service.NewModuleImpl(),
@@ -128,6 +129,23 @@ func (m *Module) event(ctx context.Context, ev
*core.Event) {
//----------------------------------------------------------------------
+// Export functions
+func (m *Module) Export(fcn map[string]any) {
+ // add exported functions from module
+}
+
+// Import functions
+func (m *Module) Import(fcn map[string]any) {
+ // resolve imports from other modules
+ m.LookupLocal = fcn["namecache:get"].(func(ctx context.Context, query
*blocks.GNSQuery) (*blocks.GNSBlock, error))
+ m.StoreLocal = fcn["namecache:put"].(func(ctx context.Context, query
*blocks.GNSQuery, block *blocks.GNSBlock) error)
+ m.LookupRemote = fcn["dht:get"].(func(ctx context.Context, query
blocks.Query) (blocks.Block, error))
+ m.RevocationQuery = fcn["rev:query"].(func(ctx context.Context, zkey
*crypto.ZoneKey) (valid bool, err error))
+ m.RevocationRevoke = fcn["rev:revoke"].(func(ctx context.Context, rd
*revocation.RevData) (success bool, err error))
+}
+
+//----------------------------------------------------------------------
+
// Resolve a GNS name with multiple labels. If pkey is not nil, the name
// is interpreted as "relative to current zone".
func (m *Module) Resolve(
diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go
index 5f95975..4109f16 100644
--- a/src/gnunet/service/module.go
+++ b/src/gnunet/service/module.go
@@ -26,7 +26,43 @@ import (
)
// Module is an interface for GNUnet service modules (workers).
+//
+// Modules can call other GNUnet services; these services can be used by
+// sending messages to the respective service socket (the default way) or by
+// calling the module functions directly (if the other module is compiled
+// along with the calling module into one binary). The latter method requires
+// calls to m.Export() and m.Import() to link the modules together (see
+// example):
+//
+// // create module instances
+// gnsMod = gns.NewModule(ctx, core)
+// dhtMod = dht.NewModule(ctx, core)
+// ncMod = namecache.NewModule(ctx, core)
+// revMod = revocation.NewModule(ctx, core)
+//
+// // export module functions
+// fcn := make(map[string]any)
+// gnsMod.Export(fcn)
+// dhtMod.Export(fcn)
+// ncMod.Export(fcn)
+// revMod.Export(fcn)
+//
+// // import (link) module functions
+// gnsMod.Import(fcn)
+// dhtMod.Import(fcn)
+// ncMod.Import(fcn)
+// revMod.Import(fcn)
+//
+// Exported and imported module function are identified by name defined in the
+// Export() function. Import() functions that access functions in other modules
+// need to use the same name for linking.
type Module interface {
+ // Export functions by name
+ Export(map[string]any)
+
+ // Import functions by name
+ Import(map[string]any)
+
// RPC returns the route and handler for JSON-RPC requests
RPC() (string, func(http.ResponseWriter, *http.Request))
diff --git a/src/gnunet/service/namecache/module.go
b/src/gnunet/service/namecache/module.go
index 9d5bca1..b9aaad0 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -35,23 +35,39 @@ import (
//----------------------------------------------------------------------
// Namecache handles the transient storage of GNS blocks under the query key.
-type NamecacheModule struct {
+type Module struct {
service.ModuleImpl
cache service.DHTStore // transient block cache
}
// NewModule creates a new module instance.
-func NewModule(ctx context.Context, c *core.Core) (m *NamecacheModule) {
- m = &NamecacheModule{
+func NewModule(ctx context.Context, c *core.Core) (m *Module) {
+ m = &Module{
ModuleImpl: *service.NewModuleImpl(),
}
m.cache, _ = service.NewDHTStore(config.Cfg.Namecache.Storage)
return
}
+//----------------------------------------------------------------------
+
+// Export functions
+func (m *Module) Export(fcn map[string]any) {
+ // add exported functions from module
+ fcn["namecache:get"] = m.Get
+ fcn["namecache:put"] = m.Put
+}
+
+// Import functions
+func (m *Module) Import(fcm map[string]any) {
+ // nothing to import now.
+}
+
+//----------------------------------------------------------------------
+
// Get an entry from the cache if available.
-func (m *NamecacheModule) Get(ctx context.Context, query *blocks.GNSQuery)
(block *blocks.GNSBlock, err error) {
+func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block
*blocks.GNSBlock, err error) {
var b blocks.Block
b, err = m.cache.Get(query)
err = blocks.Unwrap(b, block)
@@ -59,6 +75,6 @@ func (m *NamecacheModule) Get(ctx context.Context, query
*blocks.GNSQuery) (bloc
}
// Put entry into the cache.
-func (m *NamecacheModule) Put(ctx context.Context, query *blocks.GNSQuery,
block *blocks.GNSBlock) error {
+func (m *Module) Put(ctx context.Context, query *blocks.GNSQuery, block
*blocks.GNSBlock) error {
return m.cache.Put(query, block)
}
diff --git a/src/gnunet/service/revocation/module.go
b/src/gnunet/service/revocation/module.go
index 1f0ab48..eade16b 100644
--- a/src/gnunet/service/revocation/module.go
+++ b/src/gnunet/service/revocation/module.go
@@ -98,8 +98,22 @@ func (m *Module) event(ctx context.Context, ev *core.Event) {
//----------------------------------------------------------------------
+// Export functions
+func (m *Module) Export(fcn map[string]any) {
+ // add exported functions from module
+ fcn["rev:query"] = m.Query
+ fcn["rev:revoke"] = m.Revoke
+}
+
+// Import functions
+func (m *Module) Import(fcm map[string]any) {
+ // nothing to import now.
+}
+
+//----------------------------------------------------------------------
+
// Query return true if the pkey is valid (not revoked) and false
-// if the pkey has been revoked.
+// if the pkey has been revoked ["rev:query"]
func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool,
err error) {
// fast check first: is the key in the bloomfilter?
data := zkey.Bytes()
@@ -118,7 +132,7 @@ func (m *Module) Query(ctx context.Context, zkey
*crypto.ZoneKey) (valid bool, e
return false, nil
}
-// Revoke a key with given revocation data
+// Revoke a key with given revocation data ["rev:revoke"]
func (m *Module) Revoke(ctx context.Context, rd *RevData) (success bool, err
error) {
// verify the revocation data
diff, rc := rd.Verify(true)
diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go
index b54ee4e..5dabfa2 100644
--- a/src/gnunet/transport/endpoint.go
+++ b/src/gnunet/transport/endpoint.go
@@ -30,6 +30,10 @@ import (
var (
ErrEndpNotAvailable = errors.New("no endpoint for address
available")
ErrEndpProtocolMismatch = errors.New("transport protocol mismatch")
+ ErrEndpProtocolUnknown = errors.New("unknown transport protocol")
+ ErrEndpExists = errors.New("endpoint exists")
+ ErrEndpNoAddress = errors.New("no address for endpoint")
+ ErrEndpNoConnection = errors.New("no connection on endpoint")
)
// Endpoint represents a local endpoint that can send and receive messages.
@@ -83,9 +87,12 @@ type PaketEndpoint struct {
func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage)
(err error) {
// create listener
var lc net.ListenConfig
- if ep.conn, err = lc.ListenPacket(ctx, ep.addr.Network(),
ep.addr.String()); err != nil {
+ xproto := ep.addr.Network()
+ if ep.conn, err = lc.ListenPacket(ctx, EpProtocol(xproto),
ep.addr.String()); err != nil {
return
}
+ // use the actual listening address
+ ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String())
// run watch dog for termination
go func() {
@@ -95,27 +102,15 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr
chan *TransportMessage) (
// run go routine to handle messages from clients
go func() {
for {
- // read next message from packet
- n, _, err := ep.conn.ReadFrom(ep.buf)
+ // read next message
+ tm, err := ep.read()
if err != nil {
break
}
- rdr := bytes.NewBuffer(util.Clone(ep.buf[:n]))
- msg, err := ReadMessageDirect(rdr, ep.buf)
- if err != nil {
- break
- }
- // check for transport message
- if msg.Header().MsgType == message.DUMMY {
- // set transient attributes
- tm := msg.(*TransportMessage)
- tm.endp = ep.id
- tm.conn = 0
- // send to handler
- go func() {
- hdlr <- tm
- }()
- }
+ // send transport message to handler
+ go func() {
+ hdlr <- tm
+ }()
}
// connection ended.
ep.conn.Close()
@@ -123,29 +118,73 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr
chan *TransportMessage) (
return
}
+// Read a transport message from endpoint based on extended protocol
+func (ep *PaketEndpoint) read() (tm *TransportMessage, err error) {
+ // read next packet (assuming that it contains one complete message)
+ var n int
+ if n, _, err = ep.conn.ReadFrom(ep.buf); err != nil {
+ return
+ }
+ // parse transport message based on extended protocol
+ var (
+ peer *util.PeerID
+ msg message.Message
+ )
+ switch ep.addr.Network() {
+ case "ip+udp":
+ // parse peer id and message in sequence
+ peer = util.NewPeerID(ep.buf[:32])
+ rdr := bytes.NewBuffer(util.Clone(ep.buf[32:n]))
+ if msg, err = ReadMessageDirect(rdr, ep.buf); err != nil {
+ return
+ }
+ default:
+ panic(ErrEndpProtocolUnknown)
+ }
+ // return transport message
+ return &TransportMessage{
+ Peer: peer,
+ Msg: msg,
+ }, nil
+}
+
// Send message to address from endpoint
func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg
*TransportMessage) (err error) {
+ // check for valid connection
+ if ep.conn == nil {
+ return ErrEndpNoConnection
+ }
+ // resolve target address
var a *net.UDPAddr
- a, err = net.ResolveUDPAddr(addr.Network(), addr.String())
+ a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String())
+
+ // get message content (TransportMessage)
var buf []byte
if buf, err = msg.Bytes(); err != nil {
return
}
+ // handle extended protocol:
+ switch ep.addr.Network() {
+ case "ip+udp":
+ // no modifications required
+
+ default:
+ // unknown protocol
+ return ErrEndpProtocolUnknown
+ }
_, err = ep.conn.WriteTo(buf, a)
return
}
// Address returms the
func (ep *PaketEndpoint) Address() net.Addr {
- if ep.conn != nil {
- return ep.conn.LocalAddr()
- }
return ep.addr
}
// CanSendTo returns true if the endpoint can sent to address
-func (ep *PaketEndpoint) CanSendTo(addr net.Addr) bool {
- return epMode(addr.Network()) == "packet"
+func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) {
+ ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network())
+ return
}
// ID returns the endpoint identifier
@@ -153,6 +192,7 @@ func (ep *PaketEndpoint) ID() int {
return ep.id
}
+// create a new packet endpoint for protcol and address
func newPacketEndpoint(addr net.Addr) (ep *PaketEndpoint, err error) {
// check for matching protocol
if epMode(addr.Network()) != "packet" {
@@ -185,9 +225,13 @@ type StreamEndpoint struct {
func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan
*TransportMessage) (err error) {
// create listener
var lc net.ListenConfig
- if ep.listener, err = lc.Listen(ctx, ep.addr.Network(),
ep.addr.String()); err != nil {
+ xproto := ep.addr.Network()
+ if ep.listener, err = lc.Listen(ctx, EpProtocol(xproto),
ep.addr.String()); err != nil {
return
}
+ // get actual listening address
+ ep.addr = util.NewAddress(xproto, ep.listener.Addr().String())
+
// run watch dog for termination
go func() {
<-ctx.Done()
@@ -206,21 +250,14 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr
chan *TransportMessage)
go func() {
for {
// read next message from connection
- msg, err := ReadMessage(ctx, conn,
ep.buf)
+ tm, err := ep.read(ctx, conn)
if err != nil {
break
}
- // check for transport message
- if msg.Header().MsgType ==
message.DUMMY {
- // set transient attributes
- tm := msg.(*TransportMessage)
- tm.endp = ep.id
- tm.conn = session
- // send to handler
- go func() {
- hdlr <- tm
- }()
- }
+ // send transport message to handler
+ go func() {
+ hdlr <- tm
+ }()
}
// connection ended.
conn.Close()
@@ -231,6 +268,34 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr
chan *TransportMessage)
return
}
+// Read a transport message from endpoint based on extended protocol
+func (ep *StreamEndpoint) read(ctx context.Context, conn net.Conn) (tm
*TransportMessage, err error) {
+ // parse transport message based on extended protocol
+ var (
+ peer *util.PeerID
+ msg message.Message
+ )
+ switch ep.addr.Network() {
+ case "ip+udp":
+ // parse peer id
+ peer = util.NewPeerID(nil)
+ if _, err = conn.Read(peer.Key); err != nil {
+ return
+ }
+ // read next message from connection
+ if msg, err = ReadMessage(ctx, conn, ep.buf); err != nil {
+ break
+ }
+ default:
+ panic(ErrEndpProtocolUnknown)
+ }
+ // return transport message
+ return &TransportMessage{
+ Peer: peer,
+ Msg: msg,
+ }, nil
+}
+
// Send message to address from endpoint
func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg
*TransportMessage) error {
return nil
@@ -238,9 +303,6 @@ func (ep *StreamEndpoint) Send(ctx context.Context, addr
net.Addr, msg *Transpor
// Address returns the actual listening endpoint address
func (ep *StreamEndpoint) Address() net.Addr {
- if ep.listener != nil {
- return ep.listener.Addr()
- }
return ep.addr
}
@@ -254,6 +316,7 @@ func (ep *StreamEndpoint) ID() int {
return ep.id
}
+// create a new endpoint based on extended protocol and address
func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) {
// check for matching protocol
if epMode(addr.Network()) != "stream" {
@@ -270,10 +333,29 @@ func newStreamEndpoint(addr net.Addr) (ep
*StreamEndpoint, err error) {
return
}
+//----------------------------------------------------------------------
+// derive endpoint mode (packet/stream) and transport protocol from
+// net.Adddr.Network() strings
+//----------------------------------------------------------------------
+
+// EpProtocol returns the transport protocol for a given network string
+// that can include extended protocol information like "r5n+ip+udp"
+func EpProtocol(netw string) string {
+ switch netw {
+ case "udp", "udp4", "udp6", "ip+udp":
+ return "udp"
+ case "tcp", "tcp4", "tcp6":
+ return "tcp"
+ case "unix":
+ return "unix"
+ }
+ return ""
+}
+
// epMode returns the endpoint mode (packet or stream) for a given network
func epMode(netw string) string {
- switch netw {
- case "udp", "udp4", "udp6", "r5n+ip+udp":
+ switch EpProtocol(netw) {
+ case "udp":
return "packet"
case "tcp", "unix":
return "stream"
diff --git a/src/gnunet/transport/reader_writer.go
b/src/gnunet/transport/reader_writer.go
index 2e5f14a..db3527e 100644
--- a/src/gnunet/transport/reader_writer.go
+++ b/src/gnunet/transport/reader_writer.go
@@ -113,10 +113,7 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser,
buf []byte) (msg messag
if err = get(4, int(mh.MsgSize)-4); err != nil {
return nil, err
}
- // handle transport message case
- if mh.MsgType == message.DUMMY {
- msg = NewTransportMessage(nil, nil)
- } else if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil {
+ if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil {
return nil, err
}
if msg == nil {
diff --git a/src/gnunet/transport/transport.go
b/src/gnunet/transport/transport.go
index 14def98..d1e8249 100644
--- a/src/gnunet/transport/transport.go
+++ b/src/gnunet/transport/transport.go
@@ -25,6 +25,8 @@ import (
"gnunet/message"
"gnunet/util"
"net"
+
+ "github.com/bfix/gospel/network"
)
// Trnsport layer error codes
@@ -41,32 +43,19 @@ var (
// Msg is the exchanged GNUnet message. The packet itself satisfies the
// message.Message interface.
type TransportMessage struct {
- Hdr *message.Header `` // message header
- Peer *util.PeerID `` // remote peer
- Payload []byte `size:"*"` // GNUnet message
-
- // package-local attributes (transient)
- msg message.Message
- endp int // id of endpoint (incoming message)
- conn int // id of connection (optional, incoming message)
-}
-
-func (msg *TransportMessage) Header() *message.Header {
- return msg.Hdr
-}
-
-func (msg *TransportMessage) Message() (m message.Message, err error) {
- if m = msg.msg; m == nil {
- rdr := bytes.NewBuffer(msg.Payload)
- m, err = ReadMessageDirect(rdr, nil)
- }
- return
+ Peer *util.PeerID // remote peer
+ Msg message.Message // GNUnet message
}
// Bytes returns the binary representation of a transport message
func (msg *TransportMessage) Bytes() ([]byte, error) {
buf := new(bytes.Buffer)
- err := WriteMessageDirect(buf, msg)
+ // serialize peer id
+ if _, err := buf.Write(msg.Peer.Key); err != nil {
+ return nil, err
+ }
+ // serialize message
+ err := WriteMessageDirect(buf, msg.Msg)
return buf.Bytes(), err
}
@@ -76,21 +65,13 @@ func (msg *TransportMessage) String() string {
}
// NewTransportMessage creates a message suitable for transfer
-func NewTransportMessage(peer *util.PeerID, payload []byte) (tm
*TransportMessage) {
+func NewTransportMessage(peer *util.PeerID, msg message.Message) (tm
*TransportMessage) {
if peer == nil {
peer = util.NewPeerID(nil)
}
- msize := 0
- if payload != nil {
- msize = len(payload)
- }
tm = &TransportMessage{
- Hdr: &message.Header{
- MsgSize: uint16(36 + msize),
- MsgType: message.DUMMY,
- },
- Peer: peer,
- Payload: payload,
+ Peer: peer,
+ Msg: msg,
}
return
}
@@ -100,28 +81,41 @@ func NewTransportMessage(peer *util.PeerID, payload
[]byte) (tm *TransportMessag
// Transport enables network-oriented (like IP, UDP, TCP or UDS)
// message exchange on multiple endpoints.
type Transport struct {
- incoming chan *TransportMessage // messages as received from the
network
- endpoints map[int]Endpoint // list of available endpoints
+ incoming chan *TransportMessage // messages as received from the
network
+ endpoints *util.Map[int, Endpoint] // list of available endpoints
+ upnp *network.PortMapper // UPnP mapper (optional)
}
// NewTransport creates and runs a new transport layer implementation.
-func NewTransport(ctx context.Context, ch chan *TransportMessage) (t
*Transport) {
+func NewTransport(ctx context.Context, tag string, ch chan *TransportMessage)
(t *Transport) {
// create transport instance
+ mngr, err := network.NewPortMapper(tag)
+ if err != nil {
+ mngr = nil
+ }
return &Transport{
incoming: ch,
- endpoints: make(map[int]Endpoint),
+ endpoints: util.NewMap[int, Endpoint](),
+ upnp: mngr,
+ }
+}
+
+// Shutdown transport-related processes
+func (t *Transport) Shutdown() {
+ if t.upnp != nil {
+ t.upnp.Close()
}
}
// Send a message over suitable endpoint
func (t *Transport) Send(ctx context.Context, addr net.Addr, msg
*TransportMessage) (err error) {
- for _, ep := range t.endpoints {
+ // use the first endpoint able to handle address
+ return t.endpoints.ProcessRange(func(_ int, ep Endpoint) error {
if ep.CanSendTo(addr) {
- err = ep.Send(ctx, addr, msg)
- break
+ return ep.Send(ctx, addr, msg)
}
- }
- return
+ return nil
+ }, true)
}
//----------------------------------------------------------------------
@@ -130,22 +124,45 @@ func (t *Transport) Send(ctx context.Context, addr
net.Addr, msg *TransportMessa
// AddEndpoint instantiates and run a new endpoint handler for the
// given address (must map to a network interface).
-func (t *Transport) AddEndpoint(ctx context.Context, addr net.Addr) (a
net.Addr, err error) {
- // register endpoint
- var ep Endpoint
+func (t *Transport) AddEndpoint(ctx context.Context, addr *util.Address) (ep
Endpoint, err error) {
+ // check for valid address
+ if addr == nil {
+ err = ErrEndpNoAddress
+ return
+ }
+ // check if endpoint is already available
+ as := addr.Network() + "://" + addr.String()
+ if err = t.endpoints.ProcessRange(func(_ int, ep Endpoint) error {
+ ae := ep.Address().Network() + "://" + ep.Address().String()
+ if as == ae {
+ return ErrEndpExists
+ }
+ return nil
+ }, true); err != nil {
+ return
+ }
+ // register new endpoint
if ep, err = NewEndpoint(addr); err != nil {
return
}
- t.endpoints[ep.ID()] = ep
+ // add endpoint to list and run it
+ t.endpoints.Put(ep.ID(), ep)
ep.Run(ctx, t.incoming)
- return ep.Address(), nil
+ return
}
-// Endpoints returns a list of listening addresses managed by transport.
-func (t *Transport) Endpoints() (list []net.Addr) {
- list = make([]net.Addr, 0)
- for _, ep := range t.endpoints {
- list = append(list, ep.Address())
- }
- return
+//----------------------------------------------------------------------
+// UPnP handling
+//----------------------------------------------------------------------
+
+// ForwardOpen returns a local address for listening that will receive traffic
+// from a port forward handled by UPnP on the router.
+func (t *Transport) ForwardOpen(protocol, param string, port int) (id, local,
remote string, err error) {
+ // no parameters currently defined, so just do the assignment.
+ return t.upnp.Assign(protocol, port)
+}
+
+// ForwardClose closes a specific port forwarding
+func (t *Transport) ForwardClose(id string) error {
+ return t.upnp.Unassign(id)
}
diff --git a/src/gnunet/util/address.go b/src/gnunet/util/address.go
index 106e671..a56e95f 100644
--- a/src/gnunet/util/address.go
+++ b/src/gnunet/util/address.go
@@ -34,11 +34,11 @@ type Address struct {
}
// NewAddress returns a new Address for the given transport and specs
-func NewAddress(transport string, addr []byte) *Address {
+func NewAddress(transport string, addr string) *Address {
return &Address{
Netw: transport,
Options: 0,
- Address: Clone(addr),
+ Address: Clone([]byte(addr)),
Expires: AbsoluteTimeNever(),
}
}
@@ -61,7 +61,7 @@ func ParseAddress(s string) (addr *Address, err error) {
err = fmt.Errorf("invalid address format: '%s'", s)
return
}
- addr = NewAddress(p[0], []byte(strings.Trim(p[1], "/")))
+ addr = NewAddress(p[0], strings.Trim(p[1], "/"))
return
}
@@ -91,8 +91,11 @@ func (a *Address) Network() string {
//----------------------------------------------------------------------
-// AddressString returns a string representaion of an address.
-func AddressString(network string, addr []byte) string {
+// URI returns a string representaion of an address.
+func (a *Address) URI() string {
+ return URI(a.Netw, a.Address)
+}
+func URI(network string, addr []byte) string {
return network + "://" + string(addr)
}
@@ -151,13 +154,13 @@ func (a *PeerAddrList) Add(id string, addr *Address)
(mode int) {
list = append(list, addr)
a.list.Put(id, list)
return nil
- })
+ }, false)
}
return
}
// Get address for peer
-func (a *PeerAddrList) Get(id string, transport string) *Address {
+func (a *PeerAddrList) Get(id string, transport string) (res []*Address) {
list, ok := a.list.Get(id)
if ok {
for _, addr := range list {
@@ -171,10 +174,10 @@ func (a *PeerAddrList) Get(id string, transport string)
*Address {
// skip other transports
continue
}
- return addr
+ res = append(res, addr)
}
}
- return nil
+ return
}
// Delete a list entry by key.
diff --git a/src/gnunet/util/database.go b/src/gnunet/util/database.go
index a1198fd..f1d1e5f 100644
--- a/src/gnunet/util/database.go
+++ b/src/gnunet/util/database.go
@@ -121,7 +121,7 @@ func (p *dbPool) remove(key string) error {
p.insts.Delete(key)
}
return
- })
+ }, false)
}
// Connect to a SQL database (various types and flavors):
@@ -180,6 +180,6 @@ func (p *dbPool) Connect(spec string) (db *DbConn, err
error) {
db = new(DbConn)
db.conn, err = inst.db.Conn(p.ctx)
return err
- })
+ }, false)
return
}
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go
index 7240757..4443737 100644
--- a/src/gnunet/util/misc.go
+++ b/src/gnunet/util/misc.go
@@ -70,43 +70,87 @@ func NewMap[K comparable, V any]() *Map[K, V] {
}
}
+//----------------------------------------------------------------------
+
// Process a function in the locked map context. Calls
-// to other map functions in 'f' will use additional locks.
-func (m *Map[K, V]) Process(f func() error) error {
- m.mtx.Lock()
- defer m.mtx.Unlock()
+// to other map functions in 'f' will skip their locks.
+func (m *Map[K, V]) Process(f func() error, readonly bool) error {
+ // handle locking
+ m.lock(readonly)
+ m.inProcess = true
+ defer func() {
+ m.inProcess = false
+ m.unlock(readonly)
+ }()
+ // function call in unlocked environment
+ return f()
+}
+
+// Process a ranged function in the locked map context. Calls
+// to other map functions in 'f' will skip their locks.
+func (m *Map[K, V]) ProcessRange(f func(key K, value V) error, readonly bool)
error {
+ // handle locking
+ m.lock(readonly)
m.inProcess = true
- err := f()
- m.inProcess = false
- return err
+ defer func() {
+ m.inProcess = false
+ m.unlock(readonly)
+ }()
+ // range over map and call function.
+ for key, value := range m.list {
+ if err := f(key, value); err != nil {
+ return err
+ }
+ }
+ return nil
}
+//----------------------------------------------------------------------
+
// Put value into map under given key.
func (m *Map[K, V]) Put(key K, value V) {
- if !m.inProcess {
- m.mtx.Lock()
- defer m.mtx.Unlock()
- }
+ m.lock(false)
+ defer m.unlock(false)
m.list[key] = value
}
// Get value with iven key from map.
func (m *Map[K, V]) Get(key K) (value V, ok bool) {
- if !m.inProcess {
- m.mtx.RLock()
- defer m.mtx.RUnlock()
- }
+ m.lock(true)
+ defer m.unlock(true)
value, ok = m.list[key]
return
}
// Delete key/value pair from map.
func (m *Map[K, V]) Delete(key K) {
+ m.lock(false)
+ defer m.unlock(false)
+ delete(m.list, key)
+}
+
+//----------------------------------------------------------------------
+
+// lock with given mode (if not in processing function)
+func (m *Map[K, V]) lock(readonly bool) {
if !m.inProcess {
- m.mtx.Lock()
- defer m.mtx.Unlock()
+ if readonly {
+ m.mtx.RLock()
+ } else {
+ m.mtx.Lock()
+ }
+ }
+}
+
+// lock with given mode (if not in processing function)
+func (m *Map[K, V]) unlock(readonly bool) {
+ if !m.inProcess {
+ if readonly {
+ m.mtx.RUnlock()
+ } else {
+ m.mtx.Unlock()
+ }
}
- delete(m.list, key)
}
//----------------------------------------------------------------------
diff --git a/src/gnunet/util/peer_id.go b/src/gnunet/util/peer_id.go
index a8202a1..a74958d 100644
--- a/src/gnunet/util/peer_id.go
+++ b/src/gnunet/util/peer_id.go
@@ -25,23 +25,19 @@ type PeerID struct {
Key []byte `size:"32"`
}
-// NewPeerID creates a new object from the data.
-func NewPeerID(data []byte) *PeerID {
- if data == nil {
- data = make([]byte, 32)
- } else {
- size := len(data)
- if size > 32 {
- data = data[:32]
- } else if size < 32 {
- buf := make([]byte, 32)
- CopyAlignedBlock(buf, data)
- data = buf
- }
+// NewPeerID creates a new peer id from data.
+func NewPeerID(data []byte) (p *PeerID) {
+ p = &PeerID{
+ Key: make([]byte, 32),
}
- return &PeerID{
- Key: data,
+ if data != nil {
+ if len(data) < 32 {
+ CopyAlignedBlock(p.Key, data)
+ } else {
+ copy(p.Key, data[:32])
+ }
}
+ return
}
// Equals returns true if two peer IDs match.
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet-go] branch master updated: Improved transport and module code.,
gnunet <=