[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-go] branch master updated: Milestone 2+3 (NLnet funding)
From: |
gnunet |
Subject: |
[gnunet-go] branch master updated: Milestone 2+3 (NLnet funding) |
Date: |
Mon, 15 Aug 2022 14:03:32 +0200 |
This is an automated email from the git hooks/post-receive script.
bernd-fix pushed a commit to branch master
in repository gnunet-go.
The following commit(s) were added to refs/heads/master by this push:
new b620713 Milestone 2+3 (NLnet funding)
b620713 is described below
commit b62071330cd7e0445e89660d07b7aed098f80285
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Mon Aug 15 14:02:05 2022 +0200
Milestone 2+3 (NLnet funding)
---
src/gnunet/cmd/gnunet-service-dht-go/main.go | 2 +-
src/gnunet/config/config.go | 9 +-
src/gnunet/config/gnunet-config.json | 5 +-
src/gnunet/core/hello_test.go | 4 +-
src/gnunet/core/peer_test.go | 2 +-
src/gnunet/crypto/gns.go | 4 +-
src/gnunet/crypto/hash.go | 2 +-
src/gnunet/go.mod | 4 +-
src/gnunet/go.sum | 4 +-
src/gnunet/message/msg_dht_p2p.go | 256 +++++++++++++++------
src/gnunet/service/connection.go | 4 +-
src/gnunet/service/dht/blocks/filters.go | 16 +-
src/gnunet/service/dht/blocks/handlers.go | 13 +-
src/gnunet/service/dht/blocks/hello.go | 70 +++++-
src/gnunet/service/dht/blocks/hello_test.go | 164 +++++++++++++
src/gnunet/service/dht/local.go | 58 ++---
src/gnunet/service/dht/messages.go | 191 ++++++++++-----
src/gnunet/service/dht/module.go | 128 ++++++++---
src/gnunet/service/dht/path/elements.go | 2 +-
src/gnunet/service/dht/path/handling.go | 9 +-
src/gnunet/service/dht/resulthandler.go | 45 +++-
src/gnunet/service/dht/routingtable.go | 38 +--
src/gnunet/service/dht/routingtable_test.go | 10 +-
src/gnunet/service/gns/module.go | 1 +
src/gnunet/service/namecache/module.go | 16 +-
.../service/store/{store_fs.go => store_dht.go} | 194 ++++++++++------
.../store/{store_fs_meta.go => store_dht_meta.go} | 86 +++++--
.../{store_fs_meta.sql => store_dht_meta.sql} | 1 +
.../store/{dhtstore_test.go => store_dht_test.go} | 21 +-
src/gnunet/service/store/{store.go => store_kv.go} | 68 +-----
src/gnunet/transport/responder.go | 9 +-
src/gnunet/util/map.go | 2 +-
src/gnunet/util/misc.go | 11 +
src/gnunet/util/peer.go | 6 +-
34 files changed, 1010 insertions(+), 445 deletions(-)
diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go
b/src/gnunet/cmd/gnunet-service-dht-go/main.go
index d8244d9..28d2100 100644
--- a/src/gnunet/cmd/gnunet-service-dht-go/main.go
+++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go
@@ -137,7 +137,7 @@ func main() {
// check for HELLO URL
if strings.HasPrefix(bs, "gnunet://hello/") {
var hb *blocks.HelloBlock
- if hb, err = blocks.ParseHelloURL(bs, true); err != nil
{
+ if hb, err = blocks.ParseHelloBlockFromURL(bs, true);
err != nil {
logger.Printf(logger.ERROR, "[dht] failed
bootstrap HELLO URL %s: %s", bs, err.Error())
continue
}
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go
index cfbf705..897926f 100644
--- a/src/gnunet/config/config.go
+++ b/src/gnunet/config/config.go
@@ -90,9 +90,9 @@ type ServiceConfig struct {
// GNSConfig contains parameters for the GNU Name System service
type GNSConfig struct {
- Service *ServiceConfig `json:"service"` // socket for GNS
service
- DHTReplLevel int `json:"dhtReplLevel"` // DHT replication
level
- MaxDepth int `json:"maxDepth"` // maximum recursion
depth in resolution
+ Service *ServiceConfig `json:"service"` // socket for GNS service
+ ReplLevel int `json:"replLevel"` // DHT replication level
+ MaxDepth int `json:"maxDepth"` // maximum recursion depth
in resolution
}
//----------------------------------------------------------------------
@@ -109,7 +109,8 @@ type DHTConfig struct {
// RoutingConfig holds parameters for routing tables
type RoutingConfig struct {
- PeerTTL int `json:"peerTTL"` // time-out for peers in table
+ PeerTTL int `json:"peerTTL"` // time-out for peers in table
+ ReplLevel int `json:"replLevel"` // replication level
}
//----------------------------------------------------------------------
diff --git a/src/gnunet/config/gnunet-config.json
b/src/gnunet/config/gnunet-config.json
index 27678ab..2052b33 100644
--- a/src/gnunet/config/gnunet-config.json
+++ b/src/gnunet/config/gnunet-config.json
@@ -37,7 +37,8 @@
"maxGB": 10
},
"routing": {
- "peerTTL": 10800
+ "peerTTL": 10800,
+ "replLevel": 5
},
"heartbeat": 900
},
@@ -48,7 +49,7 @@
"perm": "0770"
}
},
- "dhtReplLevel": 10,
+ "replLevel": 10,
"maxDepth": 250
},
"namecache": {
diff --git a/src/gnunet/core/hello_test.go b/src/gnunet/core/hello_test.go
index 0590c90..211250c 100644
--- a/src/gnunet/core/hello_test.go
+++ b/src/gnunet/core/hello_test.go
@@ -64,7 +64,7 @@ var (
func TestHelloURLDirect(t *testing.T) {
for _, hu := range helloURL {
- if _, err := blocks.ParseHelloURL(hu, false); err != nil {
+ if _, err := blocks.ParseHelloBlockFromURL(hu, false); err !=
nil {
t.Fatal(err)
}
}
@@ -93,7 +93,7 @@ func TestHelloURL(t *testing.T) {
// convert to and from HELLO URL
url1 := hd.URL()
- hd2, err := blocks.ParseHelloURL(url1, true)
+ hd2, err := blocks.ParseHelloBlockFromURL(url1, true)
if err != nil {
t.Fatal(err)
}
diff --git a/src/gnunet/core/peer_test.go b/src/gnunet/core/peer_test.go
index 1be4d42..29fe801 100644
--- a/src/gnunet/core/peer_test.go
+++ b/src/gnunet/core/peer_test.go
@@ -67,7 +67,7 @@ func TestPeerHello(t *testing.T) {
// convert to URL and back
u := h.URL()
t.Log(u)
- h2, err := blocks.ParseHelloURL(u, true)
+ h2, err := blocks.ParseHelloBlockFromURL(u, true)
if err != nil {
t.Fatal(err)
}
diff --git a/src/gnunet/crypto/gns.go b/src/gnunet/crypto/gns.go
index 27a52d3..39eb8c5 100644
--- a/src/gnunet/crypto/gns.go
+++ b/src/gnunet/crypto/gns.go
@@ -417,7 +417,9 @@ func NewZoneSignature(d []byte) (sig *ZoneSignature, err
error) {
}
// set signature implementation
zs := impl.NewSignature()
- err = zs.Init(sig.Signature)
+ if err = zs.Init(sig.Signature); err != nil {
+ return
+ }
sig.impl = zs
// set public key implementation
zk := impl.NewPublic()
diff --git a/src/gnunet/crypto/hash.go b/src/gnunet/crypto/hash.go
index 49e57ff..a5716a0 100644
--- a/src/gnunet/crypto/hash.go
+++ b/src/gnunet/crypto/hash.go
@@ -58,7 +58,7 @@ func NewHashCode(data []byte) *HashCode {
hc := new(HashCode)
size := hc.Size()
v := make([]byte, size)
- if data != nil && len(data) > 0 {
+ if len(data) > 0 {
if uint(len(data)) < size {
util.CopyAlignedBlock(v, data)
} else {
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index 8a5db53..9e01d7c 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -3,7 +3,7 @@ module gnunet
go 1.18
require (
- github.com/bfix/gospel v1.2.17
+ github.com/bfix/gospel v1.2.18
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.6.0
github.com/gorilla/mux v1.8.0
@@ -24,4 +24,4 @@ require (
golang.org/x/tools v0.1.11 // indirect
)
-// replace github.com/bfix/gospel v1.2.17 => ../gospel
+//replace github.com/bfix/gospel v1.2.18 => ../gospel
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index 22c7e09..2a2a36a 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -1,5 +1,5 @@
-github.com/bfix/gospel v1.2.17 h1:Stvm+OiCA2GIWIhI/HKc6uaLDMtrJNxXgw/g+v9witw=
-github.com/bfix/gospel v1.2.17/go.mod
h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI=
+github.com/bfix/gospel v1.2.18 h1:X9hYudt5dvjYTGGmKC4T7qcLdb7ORblVD4kAC/ZYXdU=
+github.com/bfix/gospel v1.2.18/go.mod
h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI=
github.com/cespare/xxhash/v2 v2.1.2
h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
diff --git a/src/gnunet/message/msg_dht_p2p.go
b/src/gnunet/message/msg_dht_p2p.go
index ccc0406..638ff16 100644
--- a/src/gnunet/message/msg_dht_p2p.go
+++ b/src/gnunet/message/msg_dht_p2p.go
@@ -113,20 +113,20 @@ func (m *DHTP2PGetMsg) Update(pf *blocks.PeerFilter, rf
blocks.ResultFilter, hop
// DHTP2PPutMsg wire layout
type DHTP2PPutMsg struct {
- MsgSize uint16 `order:"big"` // total size of
message
- MsgType uint16 `order:"big"` // DHT_P2P_PUT (146)
- BType uint32 `order:"big"` // block type
- Flags uint16 `order:"big"` // processing flags
- HopCount uint16 `order:"big"` // message hops
- ReplLvl uint16 `order:"big"` // replication level
- PathL uint16 `order:"big"` // path length
- Expiration util.AbsoluteTime `` // expiration date
- PeerFilter *blocks.PeerFilter `` // peer bloomfilter
- Key *crypto.HashCode `` // query key to block
- TruncOrigin []byte `size:"(PESize)"` // truncated origin
(if TRUNCATED flag set)
- PutPath []*path.Entry `size:"PathL"` // PUT path
- LastSig []byte `size:"(PESize)"` // signature of last
hop (if RECORD_ROUTE flag is set)
- Block []byte `size:"*"` // block data
+ MsgSize uint16 `order:"big"` // total size of
message
+ MsgType uint16 `order:"big"` // DHT_P2P_PUT (146)
+ BType uint32 `order:"big"` // block type
+ Flags uint16 `order:"big"` // processing flags
+ HopCount uint16 `order:"big"` // message hops
+ ReplLvl uint16 `order:"big"` // replication level
+ PathL uint16 `order:"big"` // path length
+ Expiration util.AbsoluteTime `` // expiration date
+ PeerFilter *blocks.PeerFilter `` // peer bloomfilter
+ Key *crypto.HashCode `` // query key to block
+ TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin
(if TRUNCATED flag set)
+ PutPath []*path.Entry `size:"PathL"` // PUT path
+ LastSig *util.PeerSignature `opt:"(IsUsed)"` // signature of last
hop (if RECORD_ROUTE flag is set)
+ Block []byte `size:"*"` // block data
}
// NewDHTP2PPutMsg creates an empty new DHTP2PPutMsg
@@ -149,19 +149,15 @@ func NewDHTP2PPutMsg() *DHTP2PPutMsg {
}
}
-// PESize calculates field sizes based on flags and attributes
-func (m *DHTP2PPutMsg) PESize(field string) uint {
+// IsUsed returns true if an optional field is used
+func (m *DHTP2PPutMsg) IsUsed(field string) bool {
switch field {
case "Origin":
- if m.Flags&enums.DHT_RO_TRUNCATED != 0 {
- return util.NewPeerID(nil).Size()
- }
+ return m.Flags&enums.DHT_RO_TRUNCATED != 0
case "LastSig":
- if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 {
- return util.NewPeerSignature(nil).Size()
- }
+ return m.Flags&enums.DHT_RO_RECORD_ROUTE != 0
}
- return 0
+ return false
}
//----------------------------------------------------------------------
@@ -171,13 +167,13 @@ func (m *DHTP2PPutMsg) Update(p *path.Path, pf
*blocks.PeerFilter, hop uint16) *
msg := NewDHTP2PPutMsg()
msg.Flags = m.Flags
msg.HopCount = hop
- msg.PathL = m.PathL
+ msg.PathL = p.NumList
msg.Expiration = m.Expiration
msg.PeerFilter = pf
msg.Key = m.Key.Clone()
- msg.TruncOrigin = m.TruncOrigin
- msg.PutPath = util.Clone(m.PutPath)
- msg.LastSig = m.LastSig
+ msg.TruncOrigin = p.TruncOrigin
+ msg.PutPath = util.Clone(p.List)
+ msg.LastSig = p.LastSig
msg.Block = util.Clone(m.Block)
msg.SetPath(p)
return msg
@@ -199,11 +195,11 @@ func (m *DHTP2PPutMsg) Path(sender *util.PeerID)
*path.Path {
// handle truncate origin
if m.Flags&enums.DHT_RO_TRUNCATED == 1 {
- if m.TruncOrigin == nil || len(m.TruncOrigin) == 0 {
+ if m.TruncOrigin == nil {
logger.Printf(logger.WARN, "[path] truncated but no
origin - flag reset")
m.Flags &^= enums.DHT_RO_TRUNCATED
} else {
- pth.TruncOrigin = util.NewPeerID(m.TruncOrigin)
+ pth.TruncOrigin = m.TruncOrigin
pth.Flags |= path.PathTruncated
}
}
@@ -213,12 +209,12 @@ func (m *DHTP2PPutMsg) Path(sender *util.PeerID)
*path.Path {
pth.NumList = uint16(len(pth.List))
// handle last hop signature
- if m.LastSig == nil || len(m.LastSig) == 0 {
+ if m.LastSig == nil {
logger.Printf(logger.WARN, "[path] - last hop signature
missing - path reset")
return path.NewPath(crypto.Hash(m.Block), m.Expiration)
}
pth.Flags |= path.PathLastHop
- pth.LastSig = util.NewPeerSignature(m.LastSig)
+ pth.LastSig = m.LastSig
pth.LastHop = sender
return pth
}
@@ -235,7 +231,13 @@ func (m *DHTP2PPutMsg) SetPath(p *path.Path) {
if len(m.PutPath) > 0 {
pes = m.PutPath[0].Size()
}
- oldSize := uint(len(m.PutPath))*pes + m.PESize("Origin") +
m.PESize("LastSig")
+ oldSize := uint(len(m.PutPath)) * pes
+ if m.TruncOrigin != nil {
+ oldSize += m.TruncOrigin.Size()
+ }
+ if m.LastSig != nil {
+ oldSize += m.LastSig.Size()
+ }
// if no new path is defined,...
if p == nil {
// ... remove existing path
@@ -254,12 +256,12 @@ func (m *DHTP2PPutMsg) SetPath(p *path.Path) {
if p.TruncOrigin != nil {
// truncated path
m.Flags |= enums.DHT_RO_TRUNCATED
- m.TruncOrigin = p.TruncOrigin.Bytes()
+ m.TruncOrigin = p.TruncOrigin
}
m.PutPath = util.Clone(p.List)
m.PathL = uint16(len(m.PutPath))
if p.LastSig != nil {
- m.LastSig = p.LastSig.Bytes()
+ m.LastSig = p.LastSig
}
}
@@ -283,52 +285,174 @@ func (m *DHTP2PPutMsg) Header() *Header {
// DHTP2PResultMsg wire layout
type DHTP2PResultMsg struct {
- MsgSize uint16 `order:"big"` // total size of message
- MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148)
- BType uint32 `order:"big"` // Block type of result
- Reserved uint32 `order:"big"` // Reserved for further use
- PutPathL uint16 `order:"big"` // size of PUTPATH field
- GetPathL uint16 `order:"big"` // size of GETPATH field
- Expires util.AbsoluteTime `` // expiration date
- Query *crypto.HashCode `` // Query key for block
- Origin []byte `size:"(PESize)"` // truncated origin (if
TRUNCATED flag set)
- PutPath []*path.Entry `size:"PutPathL"` // PUTPATH
- GetPath []*path.Entry `size:"GetPathL"` // GETPATH
- LastSig []byte `size:"(PESize)"` // signature of last hop
(if RECORD_ROUTE flag is set)
- Block []byte `size:"*"` // block data
+ MsgSize uint16 `order:"big"` // total size of
message
+ MsgType uint16 `order:"big"` // DHT_P2P_RESULT
(148)
+ BType uint32 `order:"big"` // Block type of
result
+ Flags uint32 `order:"big"` // Message flags
+ PutPathL uint16 `order:"big"` // size of PUTPATH
field
+ GetPathL uint16 `order:"big"` // size of GETPATH
field
+ Expires util.AbsoluteTime `` // expiration date
+ Query *crypto.HashCode `` // Query key for
block
+ TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin
(if TRUNCATED flag set)
+ PathList []*path.Entry `size:"(NumPath)"` // PATH
+ LastSig *util.PeerSignature `size:"(IsUsed)"` // signature of last
hop (if RECORD_ROUTE flag is set)
+ Block []byte `size:"*"` // block data
}
// NewDHTP2PResultMsg creates a new empty DHTP2PResultMsg
func NewDHTP2PResultMsg() *DHTP2PResultMsg {
return &DHTP2PResultMsg{
- MsgSize: 88, // size of empty message
- MsgType: DHT_P2P_RESULT, // DHT_P2P_RESULT (148)
- BType: uint32(enums.BLOCK_TYPE_ANY), // type of returned
block
- Origin: nil, // no truncated origin
- PutPathL: 0, // empty putpath
- PutPath: nil, // -"-
- GetPathL: 0, // empty getpath
- GetPath: nil, // -"-
- LastSig: nil, // no recorded route
- Block: nil, // empty block
+ MsgSize: 88, // size of empty
message
+ MsgType: DHT_P2P_RESULT, // DHT_P2P_RESULT
(148)
+ BType: uint32(enums.BLOCK_TYPE_ANY), // type of returned
block
+ TruncOrigin: nil, // no truncated
origin
+ PutPathL: 0, // empty putpath
+ GetPathL: 0, // empty getpath
+ PathList: nil, // empty path list
(put+get)
+ LastSig: nil, // no recorded route
+ Block: nil, // empty block
}
}
-// PESize calculates field sizes based on flags and attributes
-func (m *DHTP2PResultMsg) PESize(field string) uint {
+// IsUsed returns if an optional field is present
+func (m *DHTP2PResultMsg) IsUsed(field string) bool {
switch field {
case "Origin":
- //if m.Flags&enums.DHT_RO_TRUNCATED != 0 {
- return 32
- //}
+ return m.Flags&enums.DHT_RO_TRUNCATED != 0
case "LastSig":
- //if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 {
- return 64
- //}
+ return m.Flags&enums.DHT_RO_RECORD_ROUTE != 0
+ }
+ return false
+}
+
+// NumPath returns the total number of entries in path
+func (m *DHTP2PResultMsg) NumPath(field string) uint {
+ return uint(m.GetPathL + m.PutPathL)
+}
+
+//----------------------------------------------------------------------
+// Path handling (get/set path in message)
+//----------------------------------------------------------------------
+
+// Path returns the current path from message
+func (m *DHTP2PResultMsg) Path(sender *util.PeerID) *path.Path {
+ // create a "real" path list from message data
+ pth := path.NewPath(crypto.Hash(m.Block), m.Expires)
+
+ // return empty path if recording is switched off
+ if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 {
+ return pth
+ }
+ // handle truncate origin
+ if m.Flags&enums.DHT_RO_TRUNCATED == 1 {
+ if m.TruncOrigin == nil {
+ logger.Printf(logger.WARN, "[path] truncated but no
origin - flag reset")
+ m.Flags &^= enums.DHT_RO_TRUNCATED
+ } else {
+ pth.TruncOrigin = m.TruncOrigin
+ pth.Flags |= path.PathTruncated
+ }
+ }
+ // copy path elements
+ pth.List = util.Clone(m.PathList)
+ pth.NumList = uint16(len(pth.List))
+
+ // check consistent length values; adjust if mismatched
+ if m.GetPathL+m.PutPathL != pth.NumList {
+ logger.Printf(logger.WARN, "[path] Inconsistent PATH length --
adjusting...")
+ if sp := pth.NumList - m.PutPathL; sp > 0 {
+ pth.SplitPos = sp
+ } else {
+ pth.SplitPos = 0
+ }
+ } else {
+ pth.SplitPos = pth.NumList - m.PutPathL
+ }
+ // handle last hop signature
+ if m.LastSig == nil {
+ logger.Printf(logger.WARN, "[path] - last hop signature
missing - path reset")
+ return path.NewPath(crypto.Hash(m.Block), m.Expires)
+ }
+ pth.Flags |= path.PathLastHop
+ pth.LastSig = m.LastSig
+ pth.LastHop = sender
+ return pth
+}
+
+// Set path in message; corrects the message size accordingly
+func (m *DHTP2PResultMsg) SetPath(p *path.Path) {
+
+ // return if recording is switched off (don't touch path)
+ if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 {
+ return
+ }
+ // compute old path size
+ var pes uint
+ if len(m.PathList) > 0 {
+ pes = m.PathList[0].Size()
+ }
+ oldSize := uint(len(m.PathList)) * pes
+ if m.TruncOrigin != nil {
+ oldSize += m.TruncOrigin.Size()
+ }
+ if m.LastSig != nil {
+ oldSize += m.LastSig.Size()
+ }
+ // if no new path is defined,...
+ if p == nil {
+ // ... remove existing path
+ m.TruncOrigin = nil
+ m.PathList = make([]*path.Entry, 0)
+ m.LastSig = nil
+ m.GetPathL = 0
+ m.PutPathL = 0
+ m.Flags &^= enums.DHT_RO_TRUNCATED
+ m.MsgSize -= uint16(oldSize)
+ return
+ }
+ // adjust message size
+ m.MsgSize += uint16(p.Size() - oldSize)
+
+ // transfer path data
+ if p.TruncOrigin != nil {
+ // truncated path
+ m.Flags |= enums.DHT_RO_TRUNCATED
+ m.TruncOrigin = p.TruncOrigin
+ }
+ m.PathList = util.Clone(p.List)
+ m.PutPathL = p.SplitPos
+ m.GetPathL = p.NumList - p.SplitPos
+ if p.LastSig != nil {
+ m.LastSig = p.LastSig
+ }
+}
+
+//----------------------------------------------------------------------
+
+// Update message (forwarding)
+func (m *DHTP2PResultMsg) Update(pth *path.Path) *DHTP2PResultMsg {
+ // clone old message
+ msg := &DHTP2PResultMsg{
+ MsgSize: m.MsgSize,
+ MsgType: m.MsgType,
+ BType: m.BType,
+ Flags: m.Flags,
+ PutPathL: m.PutPathL,
+ GetPathL: m.GetPathL,
+ Expires: m.Expires,
+ Query: m.Query.Clone(),
+ TruncOrigin: m.TruncOrigin,
+ PathList: util.Clone(m.PathList),
+ LastSig: m.LastSig,
+ Block: util.Clone(m.Block),
}
- return 0
+ // set new path
+ msg.SetPath(pth)
+ return msg
}
+//----------------------------------------------------------------------
+
// String returns a human-readable representation of the message.
func (m *DHTP2PResultMsg) String() string {
return fmt.Sprintf("DHTP2PResultMsg{btype=%s,putl=%d,getl=%d}",
diff --git a/src/gnunet/service/connection.go b/src/gnunet/service/connection.go
index d443160..fbc24d0 100644
--- a/src/gnunet/service/connection.go
+++ b/src/gnunet/service/connection.go
@@ -138,8 +138,8 @@ func (s *Connection) Receive(ctx context.Context)
(message.Message, error) {
}
// Receiver returns the receiving client (string representation)
-func (s *Connection) Receiver() string {
- return fmt.Sprintf("uds:%d", s.id)
+func (s *Connection) Receiver() *util.PeerID {
+ return nil
}
//----------------------------------------------------------------------
diff --git a/src/gnunet/service/dht/blocks/filters.go
b/src/gnunet/service/dht/blocks/filters.go
index 26af0b3..f617479 100644
--- a/src/gnunet/service/dht/blocks/filters.go
+++ b/src/gnunet/service/dht/blocks/filters.go
@@ -22,6 +22,7 @@ import (
"bytes"
"crypto/sha512"
"encoding/binary"
+ "gnunet/crypto"
"gnunet/util"
"github.com/bfix/gospel/logger"
@@ -105,9 +106,12 @@ type ResultFilter interface {
// Add entry to filter
Add(Block)
- // Contains returns true if entry is filtered
+ // Contains returns true if block is filtered
Contains(Block) bool
+ // ContainsHash returns true if block hash is filtered
+ ContainsHash(bh *crypto.HashCode) bool
+
// Bytes returns the binary representation of a result filter
Bytes() []byte
@@ -140,9 +144,15 @@ func (rf *GenericResultFilter) Add(b Block) {
rf.bf.Add(b.Bytes())
}
-// Contains returns true if entry (binary representation) is filtered
+// Contains returns true if a block is filtered
func (rf *GenericResultFilter) Contains(b Block) bool {
- return rf.bf.Contains(b.Bytes())
+ bh := crypto.Hash(b.Bytes())
+ return rf.bf.Contains(bh.Bits)
+}
+
+// ContainsHash returns true if a block hash is filtered
+func (rf *GenericResultFilter) ContainsHash(bh *crypto.HashCode) bool {
+ return rf.bf.Contains(bh.Bits)
}
// Bytes returns the binary representation of a result filter
diff --git a/src/gnunet/service/dht/blocks/handlers.go
b/src/gnunet/service/dht/blocks/handlers.go
index 9df3867..c166504 100644
--- a/src/gnunet/service/dht/blocks/handlers.go
+++ b/src/gnunet/service/dht/blocks/handlers.go
@@ -29,15 +29,22 @@ type BlockHandler interface {
// Parse a block instance from binary data
ParseBlock(buf []byte) (Block, error)
- // ValidateBlockQuery is used to evaluate the request for a block as
part of
- // DHT-P2P-GET processing. Here, the block payload is unknown, but if
possible
- // the XQuery and Key SHOULD be verified.
+ // ValidateBlockQuery is used to evaluate the request for a block as
part
+ // of DHT-P2P-GET processing. Here, the block payload is unknown, but if
+ // possible the XQuery and Key SHOULD be verified.
ValidateBlockQuery(key *crypto.HashCode, xquery []byte) bool
// ValidateBlockKey returns true if the block key is the same as the
// query key used to access the block.
ValidateBlockKey(b Block, key *crypto.HashCode) bool
+ // DeriveBlockKey is used to synthesize the block key from the block
+ // payload as part of PutMessage and ResultMessage processing. The
special
+ // return value of 'nil' implies that this block type does not permit
+ // deriving the key from the block. A Key may be returned for a block
that
+ // is ill-formed.
+ DeriveBlockKey(b Block) *crypto.HashCode
+
// ValidateBlockStoreRequest is used to evaluate a block payload as
part of
// PutMessage and ResultMessage processing.
ValidateBlockStoreRequest(b Block) bool
diff --git a/src/gnunet/service/dht/blocks/hello.go
b/src/gnunet/service/dht/blocks/hello.go
index 32f803f..082e914 100644
--- a/src/gnunet/service/dht/blocks/hello.go
+++ b/src/gnunet/service/dht/blocks/hello.go
@@ -30,6 +30,7 @@ import (
"net/url"
"strconv"
"strings"
+ "time"
"github.com/bfix/gospel/crypto/ed25519"
"github.com/bfix/gospel/data"
@@ -63,8 +64,21 @@ type HelloBlock struct {
addrs []*util.Address // cooked address data
}
+// NewHelloBlock initializes a new HELLO block (unsigned)
+func NewHelloBlock(peer *util.PeerID, addrs []*util.Address, ttl
time.Duration) *HelloBlock {
+ hb := new(HelloBlock)
+ hb.PeerID = peer
+ // limit expiration to second precision (HELLO-URL compatibility)
+ hb.Expires =
util.NewAbsoluteTimeEpoch(uint64(time.Now().Add(ttl).Unix()))
+ hb.SetAddresses(addrs)
+ return hb
+}
+
// SetAddresses adds a bulk of addresses for this HELLO block.
func (h *HelloBlock) SetAddresses(a []*util.Address) {
+ if len(a) == 0 {
+ return
+ }
h.addrs = util.Clone(a)
if err := h.finalize(); err != nil {
logger.Printf(logger.ERROR, "[HelloBlock.SetAddresses] failed:
%s", err.Error())
@@ -76,10 +90,10 @@ func (h *HelloBlock) Addresses() []*util.Address {
return util.Clone(h.addrs)
}
-// ParseHelloURL parses a HELLO URL of the following form:
+// ParseHelloBlockFromURL parses a HELLO URL of the following form:
// gnunet://hello/<PeerID>/<signature>/<expire>?<addrs>
// The addresses are encoded.
-func ParseHelloURL(u string, checkExpiry bool) (h *HelloBlock, err error) {
+func ParseHelloBlockFromURL(u string, checkExpiry bool) (h *HelloBlock, err
error) {
// check and trim prefix
if !strings.HasPrefix(u, helloPrefix) {
err = fmt.Errorf("invalid HELLO-URL prefix: '%s'", u)
@@ -158,8 +172,8 @@ func ParseHelloURL(u string, checkExpiry bool) (h
*HelloBlock, err error) {
return
}
-// ParseHelloFromBytes converts a byte array into a HelloBlock instance.
-func ParseHelloFromBytes(buf []byte) (h *HelloBlock, err error) {
+// ParseHelloBlockFromBytes converts a byte array into a HelloBlock instance.
+func ParseHelloBlockFromBytes(buf []byte) (h *HelloBlock, err error) {
h = new(HelloBlock)
if err = data.Unmarshal(h, buf); err == nil {
err = h.finalize()
@@ -289,7 +303,7 @@ func (h *HelloBlock) SignedData() []byte {
err := binary.Write(buf, binary.BigEndian, size)
if err == nil {
if err = binary.Write(buf, binary.BigEndian, purpose); err ==
nil {
- if err = binary.Write(buf, binary.BigEndian,
h.Expires.Epoch()*1000000); err == nil {
+ if err = binary.Write(buf, binary.BigEndian, h.Expires
/*.Epoch()*1000000*/); err == nil {
if n, err = buf.Write(hAddr[:]); err == nil {
if n != len(hAddr[:]) {
err = errors.New("signed data
size mismatch")
@@ -313,7 +327,7 @@ type HelloBlockHandler struct{}
// Parse a block instance from binary data
func (bh *HelloBlockHandler) ParseBlock(buf []byte) (Block, error) {
- return ParseHelloFromBytes(buf)
+ return ParseHelloBlockFromBytes(buf)
}
// ValidateHelloBlockQuery validates query parameters for a
@@ -326,20 +340,49 @@ func (bh *HelloBlockHandler) ValidateBlockQuery(key
*crypto.HashCode, xquery []b
// ValidateBlockKey returns true if the block key is the same as the
// query key used to access the block.
func (bh *HelloBlockHandler) ValidateBlockKey(b Block, key *crypto.HashCode)
bool {
+ // check for matching keys
+ bkey := bh.DeriveBlockKey(b)
+ if bkey == nil {
+ logger.Println(logger.WARN, "[HelloHdlr] ValidateBlockKey: not
a HELLO block")
+ return false
+ }
+ return key.Equals(bkey)
+}
+
+// DeriveBlockKey is used to synthesize the block key from the block
+// payload as part of PutMessage and ResultMessage processing. The special
+// return value of 'nil' implies that this block type does not permit
+// deriving the key from the block. A Key may be returned for a block that
+// is ill-formed.
+func (bh *HelloBlockHandler) DeriveBlockKey(b Block) *crypto.HashCode {
+ // check for correct type
hb, ok := b.(*HelloBlock)
if !ok {
- return false
+ logger.Println(logger.WARN, "[HelloHdlr] DeriveBlockKey: not a
HELLO block")
+ return nil
}
// key must be the hash of the peer id
- bkey := crypto.Hash(hb.PeerID.Bytes())
- return key.Equals(bkey)
+ return crypto.Hash(hb.PeerID.Bytes())
}
// ValidateBlockStoreRequest is used to evaluate a block payload as part of
// PutMessage and ResultMessage processing.
+// To validate a block store request is to verify the EdDSA SIGNATURE over
+// the hashed ADDRESSES against the public key from the peer ID field. If the
+// signature is valid true is returned.
func (bh *HelloBlockHandler) ValidateBlockStoreRequest(b Block) bool {
- // TODO: verify block payload
- return true
+ // check for correct type
+ hb, ok := b.(*HelloBlock)
+ if !ok {
+ logger.Println(logger.WARN, "[HelloHdlr]
ValidateBlockStoreRequest: not a HELLO block")
+ return false
+ }
+ // verify signature
+ ok, err := hb.Verify()
+ if err != nil {
+ ok = false
+ }
+ return ok
}
// SetupResultFilter is used to setup an empty result filter. The arguments
@@ -428,6 +471,11 @@ func (rf *HelloResultFilter) Contains(b Block) bool {
return false
}
+// ContainsHash checks if a block hash is contained in the result filter
+func (rf *HelloResultFilter) ContainsHash(bh *crypto.HashCode) bool {
+ return rf.bf.Contains(bh.Bits)
+}
+
// Bytes returns a binary representation of a HELLO result filter
func (rf *HelloResultFilter) Bytes() []byte {
return rf.bf.Bytes()
diff --git a/src/gnunet/service/dht/blocks/hello_test.go
b/src/gnunet/service/dht/blocks/hello_test.go
new file mode 100644
index 0000000..deb8041
--- /dev/null
+++ b/src/gnunet/service/dht/blocks/hello_test.go
@@ -0,0 +1,164 @@
+// This file is part of gnunet-go, a GNUnet-implementation in Golang.
+// Copyright (C) 2019-2022 Bernd Fix >Y<
+//
+// gnunet-go is free software: you can redistribute it and/or modify it
+// under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// gnunet-go is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+// SPDX-License-Identifier: AGPL3.0-or-later
+
+package blocks
+
+import (
+ "bytes"
+ "encoding/base64"
+ "encoding/hex"
+ "gnunet/util"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/bfix/gospel/crypto/ed25519"
+ "github.com/bfix/gospel/data"
+)
+
+var (
+ block *HelloBlock
+ sk *ed25519.PrivateKey
+)
+
+func setup(t *testing.T) {
+ t.Helper()
+
+ // check for initialized values
+ if block != nil {
+ return
+ }
+ // generate keys
+ var pk *ed25519.PublicKey
+ pk, sk = ed25519.NewKeypair()
+ peer := util.NewPeerID(pk.Bytes())
+
+ // set addresses
+ addrs := []string{
+ "ip+udp://172.17.0.6:2086",
+ "ip+udp://245.23.42.67:2086",
+ }
+ addrList := make([]*util.Address, 0)
+ for _, addr := range addrs {
+ frag := strings.Split(addr, "://")
+ e := util.NewAddress(frag[0], frag[1])
+ if e == nil {
+ t.Fatal("invalid address: " + addr)
+ }
+ addrList = append(addrList, e)
+ }
+
+ // create new HELLO block
+ block = NewHelloBlock(peer, addrList, time.Hour)
+
+ // sign block.
+ sig, err := sk.EdSign(block.SignedData())
+ if err != nil {
+ t.Fatal(err)
+ }
+ block.Signature = util.NewPeerSignature(sig.Bytes())
+}
+
+func TestHelloVerify(t *testing.T) {
+ setup(t)
+
+ // verify signature
+ ok, err := block.Verify()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !ok {
+ t.Fatal("HELLO verify failed")
+ }
+}
+
+func TestHelloURL(t *testing.T) {
+ setup(t)
+
+ // create HELLO URL
+ url := block.URL()
+ t.Log(url)
+
+ // read back
+ tblk, err := ParseHelloBlockFromURL(url, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // verify identical blocks
+ if !bytes.Equal(tblk.Bytes(), block.Bytes()) {
+ t.Log(hex.EncodeToString(tblk.Bytes()))
+ t.Log(hex.EncodeToString(block.Bytes()))
+ t.Fatal("URL readback failed")
+ }
+}
+
+func TestHelloBytes(t *testing.T) {
+ setup(t)
+
+ buf := block.Bytes()
+ tblk, err := ParseHelloBlockFromBytes(buf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // verify identical blocks
+ if !bytes.Equal(tblk.Bytes(), block.Bytes()) {
+ t.Log(hex.EncodeToString(tblk.Bytes()))
+ t.Log(hex.EncodeToString(block.Bytes()))
+ t.Fatal("Bytes readback failed")
+ }
+}
+
+func TestHelloDebug(t *testing.T) {
+ blkData := "QKObXJUbnnghRh9McDDjHaB9IIL6MhhEiQHc8VfO3QMABeZZJJhsA" +
+ "GlwK3VkcDovLzEyNy4wLjAuMToxMDAwMQBpcCt1ZHA6Ly8xNzIuMT" +
+ "cuMC40OjEwMDAxAGlwK3VkcDovL1s6OmZmZmY6MTcyLjE3LjAuNF06MTAwMDEA"
+ buf, err := base64.RawStdEncoding.DecodeString(blkData)
+ if err != nil {
+ t.Fatal(err)
+ }
+ hb, err := ParseHelloBlockFromBytes(buf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ok, err := hb.Verify()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !ok {
+ // trace problem
+ t.Log("Block: " + hex.EncodeToString(buf))
+ t.Log("PeerID: " + hb.PeerID.String())
+ t.Log(" -> " + hex.EncodeToString(hb.PeerID.Bytes()))
+ t.Logf("Expire: %d", hb.Expires.Val)
+ t.Logf(" -> " + hb.Expires.String())
+ var exp util.AbsoluteTime
+ if err = data.Unmarshal(&exp, buf[32:40]); err != nil {
+ t.Fatal(err)
+ }
+ t.Logf(" -> " + exp.String())
+ t.Log("AddrBin: " + hex.EncodeToString(hb.AddrBin))
+ sd := hb.SignedData()
+ t.Log("SignedData: " + hex.EncodeToString(sd))
+ t.Log("Addresses:")
+ for _, addr := range hb.Addresses() {
+ t.Logf("* " + addr.URI())
+ }
+ t.Log("Signature: " + hex.EncodeToString(hb.Signature.Bytes()))
+ t.Fatal("debug HELLO verify failed")
+ }
+}
diff --git a/src/gnunet/service/dht/local.go b/src/gnunet/service/dht/local.go
index 1e6b100..ba76892 100644
--- a/src/gnunet/service/dht/local.go
+++ b/src/gnunet/service/dht/local.go
@@ -19,7 +19,6 @@
package dht
import (
- "errors"
"gnunet/enums"
"gnunet/service/dht/blocks"
"gnunet/service/store"
@@ -28,58 +27,37 @@ import (
"github.com/bfix/gospel/math"
)
-// getHelloCache tries to find the requested HELLO block in the HELLO cache
-func (m *Module) getHelloCache(label string, addr *PeerAddress, rf
blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int) {
+// lookupHelloCache tries to find the requested HELLO block in the HELLO cache
+func (m *Module) lookupHelloCache(label string, addr *PeerAddress, rf
blocks.ResultFilter, approx bool) (results []*store.DHTResult) {
logger.Printf(logger.DBG, "[%s] GET message for HELLO: check cache",
label)
// find best cached HELLO
- var block blocks.Block
- block, dist = m.rtable.BestHello(addr, rf)
-
- // if block is filtered, skip it
- if block != nil {
- if !rf.Contains(block) {
- entry = &store.DHTEntry{Blk: block}
- } else {
- logger.Printf(logger.DBG, "[%s] GET message for HELLO:
matching DHT block is filtered", label)
- entry = nil
- dist = nil
- }
- }
- return
+ return m.rtable.LookupHello(addr, rf, approx)
}
// getLocalStorage tries to find the requested block in local storage
-func (m *Module) getLocalStorage(label string, query blocks.Query, rf
blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int, err error) {
+func (m *Module) getLocalStorage(label string, query blocks.Query, rf
blocks.ResultFilter) (results []*store.DHTResult, err error) {
- // query DHT store for exact match (9.4.3.3c)
- if entry, err = m.store.Get(query); err != nil {
+ // query DHT store for exact matches (9.4.3.3c)
+ var entries []*store.DHTEntry
+ if entries, err = m.store.Get(label, query, rf); err != nil {
logger.Printf(logger.ERROR, "[%s] Failed to get DHT block from
storage: %s", label, err.Error())
return
}
- if entry != nil {
- dist = math.ZERO
- // check if we are filtered out
- if rf.Contains(entry.Blk) {
- logger.Printf(logger.DBG, "[%s] matching DHT block is
filtered", label)
- entry = nil
- dist = nil
+ for _, entry := range entries {
+ // add entry to result list
+ result := &store.DHTResult{
+ Entry: entry,
+ Dist: math.ZERO,
}
+ results = append(results, result)
+ // add to result filter
+ rf.Add(entry.Blk)
}
// if we have no exact match, find approximate block if requested
- if entry == nil || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE != 0 {
+ if len(results) == 0 || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE !=
0 {
// no exact match: find approximate (9.4.3.3b)
- match := func(e *store.DHTEntry) bool {
- return rf.Contains(e.Blk)
- }
- var d any
- entry, d, err = m.store.GetApprox(query, match)
- var ok bool
- dist, ok = d.(*math.Int)
- if !ok {
- err = errors.New("no approx distance")
- }
- if err != nil {
- logger.Printf(logger.ERROR, "[%s] Failed to get
(approx.) DHT block from storage: %s", label, err.Error())
+ if results, err = m.store.GetApprox(label, query, rf); err !=
nil {
+ logger.Printf(logger.ERROR, "[%s] Failed to get
(approx.) DHT blocks from storage: %s", label, err.Error())
}
}
return
diff --git a/src/gnunet/service/dht/messages.go
b/src/gnunet/service/dht/messages.go
index deb8461..76a92b6 100644
--- a/src/gnunet/service/dht/messages.go
+++ b/src/gnunet/service/dht/messages.go
@@ -20,6 +20,7 @@ package dht
import (
"context"
+ "gnunet/crypto"
"gnunet/enums"
"gnunet/message"
"gnunet/service/dht/blocks"
@@ -29,7 +30,6 @@ import (
"gnunet/util"
"github.com/bfix/gospel/logger"
- "github.com/bfix/gospel/math"
)
//----------------------------------------------------------------------
@@ -38,6 +38,7 @@ import (
// HandleMessage handles a DHT request/response message. Responses are sent
// to the specified responder.
+//nolint:gocyclo // life sometimes is complex...
func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn
message.Message, back transport.Responder) bool {
// assemble log label
label := "dht"
@@ -58,15 +59,18 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
// process message
switch msg := msgIn.(type) {
+ //==================================================================
+ // DHT-P2P-GET
+ //==================================================================
case *message.DHTP2PGetMsg:
//--------------------------------------------------------------
// DHT-P2P GET
//--------------------------------------------------------------
logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message",
label)
- query := blocks.NewGenericQuery(msg.Query,
enums.BlockType(msg.BType), msg.Flags)
- var entry *store.DHTEntry
- var dist *math.Int
+ // assemble query and initialize (cache) results
+ query := blocks.NewGenericQuery(msg.Query,
enums.BlockType(msg.BType), msg.Flags)
+ var results []*store.DHTResult
//--------------------------------------------------------------
// validate query (based on block type requested) (9.4.3.1)
@@ -113,7 +117,8 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0)
demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0
approx := int(msg.Flags)&enums.DHT_RO_FIND_APPROXIMATE != 0
- // actions
+
+ // enforced actions
doResult := closest || (demux && approx)
doForward := !closest || (demux && !approx)
logger.Printf(logger.DBG, "[%s] GET message: closest=%v,
demux=%v, approx=%v --> result=%v, forward=%v",
@@ -122,53 +127,43 @@ func (m *Module) HandleMessage(ctx context.Context,
sender *util.PeerID, msgIn m
//------------------------------------------------------
// query for a HELLO? (9.4.3.3a)
if btype == enums.BLOCK_TYPE_DHT_URL_HELLO {
- // try to find result in HELLO cache
- entry, dist = m.getHelloCache(label, addr, rf)
+ // try to find results in HELLO cache
+ results = m.lookupHelloCache(label, addr, rf, approx)
}
+
//--------------------------------------------------------------
- // find the closest block that has that is not filtered by the
result
- // filter (in case we did not find an appropriate block in
cache).
+ // query flags demand a result
if doResult {
- // save best-match values from cache
- entryCache := entry
- distCache := dist
- dist = nil
-
- // if we don't have an exact match, try storage lookup
- if entryCache == nil || (distCache != nil &&
!distCache.Equals(math.ZERO)) {
- // get entry from local storage
- var err error
- if entry, dist, err = m.getLocalStorage(label,
query, rf); err != nil {
- entry = nil
- dist = nil
- }
- // if we have a block from cache, check if it
is better than the
- // block found in the DHT
- if entryCache != nil && dist != nil &&
distCache.Cmp(dist) < 0 {
- entry = entryCache
- dist = distCache
+ // if we don't have a result from cache or are in
approx mode,
+ // try storage lookup
+ if len(results) == 0 || approx {
+ // get results from local storage
+ lclResults, err := m.getLocalStorage(label,
query, rf)
+ if err == nil {
+ // append local results
+ results = append(results, lclResults...)
}
}
- // if we have a block, send it as response
- if entry != nil {
+ // if we have results, send them as response
+ for _, result := range results {
+ var pth *path.Path
+ // check if record the route
+ if msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 &&
result.Entry.Path != nil {
+ // update get path
+ pth = result.Entry.Path.Clone()
+ pth.SplitPos = pth.NumList
+ pe := pth.NewElement(pth.LastHop,
local, back.Receiver())
+ pth.Add(pe)
+ }
+
logger.Printf(logger.INFO, "[%s] sending DHT
result message to caller", label)
- if err := m.sendResult(ctx, query, entry.Blk,
back); err != nil {
+ if err := m.sendResult(ctx, query,
result.Entry.Blk, pth, back); err != nil {
logger.Printf(logger.ERROR, "[%s]
Failed to send DHT result message: %s", label, err.Error())
}
}
}
- // check if we need to forward message based on filter result
- if entry != nil && blockHdlr != nil {
- switch blockHdlr.FilterResult(entry.Blk, query.Key(),
rf, msg.XQuery) {
- case blocks.RF_LAST:
- // no need for further results
- case blocks.RF_MORE:
- // possibly more results
- doForward = true
- case blocks.RF_DUPLICATE, blocks.RF_IRRELEVANT:
- // do not forward
- }
- }
+ //--------------------------------------------------------------
+ // query flags demand a result
if doForward {
// build updated GET message
pf.Add(local)
@@ -195,6 +190,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
}
logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message
done", label)
+ //==================================================================
+ // DHT-P2P-PUT
+ //==================================================================
case *message.DHTP2PPutMsg:
//----------------------------------------------------------
// DHT-P2P PUT
@@ -267,29 +265,29 @@ func (m *Module) HandleMessage(ctx context.Context,
sender *util.PeerID, msgIn m
logger.Printf(logger.ERROR, "[%s] failed to
store DHT entry: %s", label, err.Error())
}
}
-
//--------------------------------------------------------------
// if the put is for a HELLO block, add the sender to the
// routing table (9.3.2.9)
if btype == enums.BLOCK_TYPE_DHT_HELLO {
// get addresses from HELLO block
- hello, err := blocks.ParseHelloFromBytes(msg.Block)
+ hello, err := blocks.ParseHelloBlockFromBytes(msg.Block)
if err != nil {
logger.Printf(logger.ERROR, "[%s] failed to
parse HELLO block: %s", label, err.Error())
} else {
// check state of bucket for given address
- if m.rtable.Check(NewPeerAddress(sender)) == 0 {
+ if m.rtable.Check(NewPeerAddress(hello.PeerID))
== 0 {
// we could add the sender to the
routing table
for _, addr := range hello.Addresses() {
if
transport.CanHandleAddress(addr) {
// try to connect to
peer (triggers EV_CONNECTED on success)
-
m.core.TryConnect(sender, addr)
+ if err :=
m.core.TryConnect(sender, addr); err != nil {
+
logger.Printf(logger.ERROR, "[%s] try-connection to %s failed: %s", label,
addr.URI(), err.Error())
+ }
}
}
}
}
}
-
//--------------------------------------------------------------
// check if we need to forward
if !closest || demux {
@@ -325,24 +323,100 @@ func (m *Module) HandleMessage(ctx context.Context,
sender *util.PeerID, msgIn m
}
logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message
done", label)
+ //==================================================================
+ // DHT-P2P-RESULT
+ //==================================================================
case *message.DHTP2PResultMsg:
//----------------------------------------------------------
// DHT-P2P RESULT
//----------------------------------------------------------
- logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT
message", label)
+ logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT
message for type %s",
+ label, enums.BlockType(msg.BType).String())
- // check task list for handler
+ //--------------------------------------------------------------
+ // check if request is expired (9.5.2.1)
+ if msg.Expires.Expired() {
+ logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT message
expired (%s)",
+ label, msg.Expires.String())
+ return false
+ }
+ //--------------------------------------------------------------
+ btype := enums.BlockType(msg.BType)
+ var blkKey *crypto.HashCode
+ blockHdlr, ok := blocks.BlockHandlers[btype]
+ if ok {
+ // reconstruct block instance
+ if block, err := blockHdlr.ParseBlock(msg.Block); err
== nil {
+ // validate block (9.5.2.2)
+ if !blockHdlr.ValidateBlockStoreRequest(block) {
+ logger.Printf(logger.WARN, "[%s]
DHT-P2P-RESULT invalid block -- discarded", label)
+ return false
+ }
+ // Compute block key (9.5.2.4)
+ blkKey = blockHdlr.DeriveBlockKey(block)
+ }
+ } else {
+ logger.Printf(logger.INFO, "[%s] No validator defined
for block type %s", label, btype.String())
+ blockHdlr = nil
+ }
+ //--------------------------------------------------------------
+ // verify path (9.5.2.3)
+ var pth *path.Path
+ if msg.GetPathL+msg.PutPathL > 0 {
+ pth = msg.Path(sender)
+ pth.Verify(local)
+ }
+ //--------------------------------------------------------------
+ // if the put is for a HELLO block, add the originator to the
+ // routing table (9.5.2.5)
+ if btype == enums.BLOCK_TYPE_DHT_HELLO {
+ // get addresses from HELLO block
+ hello, err := blocks.ParseHelloBlockFromBytes(msg.Block)
+ if err != nil {
+ logger.Printf(logger.ERROR, "[%s] failed to
parse HELLO block: %s", label, err.Error())
+ } else {
+ // check state of bucket for given address
+ if m.rtable.Check(NewPeerAddress(hello.PeerID))
== 0 {
+ // we could add the originator to the
routing table
+ for _, addr := range hello.Addresses() {
+ if
transport.CanHandleAddress(addr) {
+ // try to connect to
peer (triggers EV_CONNECTED on success)
+ if err :=
m.core.TryConnect(sender, addr); err != nil {
+
logger.Printf(logger.ERROR, "[%s] try-connection to %s failed: %s", label,
addr.URI(), err.Error())
+ }
+ }
+ }
+ }
+ }
+ }
+ // message forwarding to responder
key := msg.Query.String()
logger.Printf(logger.DBG, "[%s] DHT-P2P-RESULT key = %s",
label, key)
handled := false
if list, ok := m.reshdlrs.Get(key); ok {
for _, rh := range list {
logger.Printf(logger.DBG, "[%s] Task #%d for
DHT-P2P-RESULT found", label, rh.ID())
- // handle the message
- go rh.Handle(ctx, msg)
+
+
//--------------------------------------------------------------
+ // check task list for handler (9.5.2.6)
+ if rh.Flags()&enums.DHT_RO_FIND_APPROXIMATE ==
0 && blkKey != nil && !blkKey.Equals(rh.Key()) {
+ // (9.5.2.6.a) derived key mismatch
+ logger.Printf(logger.ERROR, "[%s]
derived block key / query key mismatch:", label)
+ logger.Printf(logger.ERROR, "[%s] -->
%s != %s", label, blkKey.String(), rh.Key().String())
+ return false
+ }
+ // (9.5.2.6.b+c) check block against query
+ /*
+ if blockHdlr != nil {
+
blockHdlr.FilterBlockResult(block, rh.Key())
+ }
+ */
+
+
//--------------------------------------------------------------
+ // handle the message (forwarding)
+ go rh.Handle(ctx, msg, pth, sender, local)
handled = true
}
- return true
}
if !handled {
logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT not
processed (no handler)", label)
@@ -350,6 +424,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT
message done", label)
return handled
+ //==================================================================
+ // DHT-P2P-HELLO
+ //==================================================================
case *message.DHTP2PHelloMsg:
//----------------------------------------------------------
// DHT-P2P HELLO
@@ -379,8 +456,8 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
logger.Printf(logger.INFO, "[%s] Sending HELLO to %s:
%s", label, sender, msgOut)
err = m.core.Send(ctx, sender, msgOut)
// no error if the message might have been sent
- if err == transport.ErrEndpMaybeSent {
- err = nil
+ if err != nil && err != transport.ErrEndpMaybeSent {
+ logger.Printf(logger.ERROR, "[%s] Failed to
send HELLO message: %s", label, err.Error())
}
}
@@ -401,9 +478,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
})
}
- //--------------------------------------------------------------
+ //==================================================================
// Legacy message types (not implemented)
- //--------------------------------------------------------------
+ //==================================================================
case *message.DHTClientPutMsg:
//----------------------------------------------------------
@@ -446,7 +523,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
}
// send a result back to caller
-func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk
blocks.Block, back transport.Responder) error {
+func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk
blocks.Block, pth *path.Path, back transport.Responder) error {
// assemble result message
out := message.NewDHTP2PResultMsg()
out.BType = uint32(query.Type())
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 6207f94..5278b13 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -24,6 +24,7 @@ import (
"gnunet/config"
"gnunet/core"
"gnunet/crypto"
+ "gnunet/enums"
"gnunet/message"
"gnunet/service"
"gnunet/service/dht/blocks"
@@ -33,7 +34,6 @@ import (
"time"
"github.com/bfix/gospel/logger"
- "github.com/bfix/gospel/math"
)
//======================================================================
@@ -41,22 +41,51 @@ import (
//======================================================================
//----------------------------------------------------------------------
-// Responder for local message handling
+// Responder for local message handling (API, not message-based)
//----------------------------------------------------------------------
-type LocalResponder struct {
- ch chan blocks.Block // out-going channel for incoming blocks
+// LocalBlockResponder is a message handler used to handle results for
+// locally initiated GET calls
+type LocalBlockResponder struct {
+ ch chan blocks.Block // out-going channel for incoming block results
+ rf blocks.ResultFilter // filter out duplicates
}
-func (lr *LocalResponder) Send(ctx context.Context, msg message.Message) error
{
+// NewLocalBlockResponder returns a new instance
+func NewLocalBlockResponder() *LocalBlockResponder {
+ return &LocalBlockResponder{
+ ch: make(chan blocks.Block),
+ rf: blocks.NewGenericResultFilter(),
+ }
+}
+
+// C returns the back-channel
+func (lr *LocalBlockResponder) C() <-chan blocks.Block {
+ return lr.ch
+}
+
+// Send interface method: dissect message and relay block if appropriate
+func (lr *LocalBlockResponder) Send(ctx context.Context, msg message.Message)
error {
+ // check if incoming message is a DHT-RESULT
+ switch res := msg.(type) {
+ case *message.DHTP2PResultMsg:
+ // deliver incoming blocks
+ go func() {
+ lr.ch <- blocks.NewGenericBlock(res.Block)
+ }()
+ default:
+ logger.Println(logger.WARN, "[local] not a DHT-RESULT --
skipped")
+ }
return nil
}
-func (lr *LocalResponder) Receiver() string {
- return "@"
+// Receiver is nil for local responders.
+func (lr *LocalBlockResponder) Receiver() *util.PeerID {
+ return nil
}
-func (lr *LocalResponder) Close() {
+// Close back-channel
+func (lr *LocalBlockResponder) Close() {
close(lr.ch)
}
@@ -68,8 +97,9 @@ func (lr *LocalResponder) Close() {
type Module struct {
service.ModuleImpl
- store store.DHTStore // reference to the block storage mechanism
- core *core.Core // reference to core services
+ cfg *config.DHTConfig // configuraion parameters
+ store *store.DHTStore // reference to the block storage mechanism
+ core *core.Core // reference to core services
rtable *RoutingTable // routing table
lastHello *message.DHTP2PHelloMsg // last own HELLO message used;
re-create if expired
@@ -80,7 +110,7 @@ type Module struct {
// mechanism for persistence.
func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m
*Module, err error) {
// create permanent storage handler
- var storage store.DHTStore
+ var storage *store.DHTStore
if storage, err = store.NewDHTStore(cfg.Storage); err != nil {
return
}
@@ -90,6 +120,7 @@ func NewModule(ctx context.Context, c *core.Core, cfg
*config.DHTConfig) (m *Mod
// return module instance
m = &Module{
ModuleImpl: *service.NewModuleImpl(),
+ cfg: cfg,
store: storage,
core: c,
rtable: rt,
@@ -99,18 +130,58 @@ func NewModule(ctx context.Context, c *core.Core, cfg
*config.DHTConfig) (m *Mod
pulse := time.Duration(cfg.Heartbeat) * time.Second
listener := m.Run(ctx, m.event, m.Filter(), pulse, m.heartbeat)
c.Register("dht", listener)
+
+ // run periodic tasks (8.2. peer discovery)
+ ticker := time.NewTicker(5 * time.Minute)
+ key := crypto.Hash(m.core.PeerID().Data)
+ flags := uint16(enums.DHT_RO_FIND_APPROXIMATE |
enums.DHT_RO_DEMULTIPLEX_EVERYWHERE)
+ var resCh <-chan blocks.Block
+ go func() {
+ for {
+ select {
+ // initiate peer discovery
+ case <-ticker.C:
+ // query DHT for our own HELLO block
+ query := blocks.NewGenericQuery(key,
enums.BLOCK_TYPE_DHT_HELLO, flags)
+ resCh = m.Get(ctx, query)
+
+ // handle peer discover results
+ case res := <-resCh:
+ // check for correct type
+ btype := res.Type()
+ if btype == enums.BLOCK_TYPE_DHT_HELLO {
+ hb, ok := res.(*blocks.HelloBlock)
+ if !ok {
+ logger.Printf(logger.WARN,
"[dht] peer discovery received invalid block data")
+ } else {
+ // cache HELLO block
+ m.rtable.CacheHello(hb)
+ // add sender to routing table
+
m.rtable.Add(NewPeerAddress(hb.PeerID), "dht")
+ }
+ } else {
+ logger.Printf(logger.WARN, "[dht] peer
discovery received invalid block type %s", btype.String())
+ }
+
+ // termination
+ case <-ctx.Done():
+ ticker.Stop()
+ return
+ }
+ }
+ }()
return
}
//----------------------------------------------------------------------
-// DHT methods for local use
+// DHT methods for local use (API)
//----------------------------------------------------------------------
// Get blocks from the DHT ["dht:get"]
// Locally request blocks for a given query. The res channel will deliver the
// returned results to the caller; the channel is closed if no further blocks
// are expected or the query times out.
-func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan
blocks.Block) {
+func (m *Module) Get(ctx context.Context, query blocks.Query) <-chan
blocks.Block {
// get the block handler for given block type to construct an empty
// result filter. If no handler is defined, a default PassResultFilter
// is created.
@@ -123,14 +194,14 @@ func (m *Module) Get(ctx context.Context, query
blocks.Query) (res chan blocks.B
logger.Println(logger.WARN, "[dht] unknown result filter
implementation -- skipped")
}
// get additional query parameters
- xquery, ok := util.GetParam[[]byte](query.Params(), "xquery")
+ xquery, _ := util.GetParam[[]byte](query.Params(), "xquery")
// assemble a new GET message
msg := message.NewDHTP2PGetMsg()
msg.BType = uint32(query.Type())
msg.Flags = query.Flags()
msg.HopCount = 0
- msg.ReplLevel = 10
+ msg.ReplLevel = uint16(m.cfg.Routing.ReplLevel)
msg.PeerFilter = blocks.NewPeerFilter()
msg.ResFilter = rf.Bytes()
msg.RfSize = uint16(len(msg.ResFilter))
@@ -138,15 +209,13 @@ func (m *Module) Get(ctx context.Context, query
blocks.Query) (res chan blocks.B
msg.MsgSize += msg.RfSize + uint16(len(xquery))
// compose a response channel and handler
- res = make(chan blocks.Block)
- hdlr := &LocalResponder{
- ch: res,
- }
+ hdlr := NewLocalBlockResponder()
+
// time-out handling
ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout")
if !ok {
// defaults to 10 minutes
- ttl = 600 * time.Second
+ ttl = 10 * time.Minute
}
lctx, cancel := context.WithTimeout(ctx, ttl)
@@ -159,23 +228,12 @@ func (m *Module) Get(ctx context.Context, query
blocks.Query) (res chan blocks.B
hdlr.Close()
cancel()
}()
- return res
+ return hdlr.C()
}
// GetApprox returns the first block not excluded ["dht:getapprox"]
-func (m *Module) GetApprox(ctx context.Context, query blocks.Query, excl
func(*store.DHTEntry) bool) (entry *store.DHTEntry, dist *math.Int, err error) {
- var val any
- if entry, val, err = m.store.GetApprox(query, excl); err != nil {
- return
- }
- hc, ok := val.(*crypto.HashCode)
- if !ok {
- err = errors.New("no approx result")
- }
- asked := NewQueryAddress(query.Key())
- found := NewQueryAddress(hc)
- dist, _ = found.Distance(asked)
- return
+func (m *Module) GetApprox(ctx context.Context, query blocks.Query, rf
blocks.ResultFilter) (results []*store.DHTResult, err error) {
+ return m.store.GetApprox("dht", query, rf)
}
// Put a block into the DHT ["dht:put"]
@@ -191,7 +249,7 @@ func (m *Module) Put(ctx context.Context, query
blocks.Query, block blocks.Block
msg.Flags = query.Flags()
msg.HopCount = 0
msg.PeerFilter = blocks.NewPeerFilter()
- msg.ReplLvl = 10
+ msg.ReplLvl = uint16(m.cfg.Routing.ReplLevel)
msg.Expiration = expire
msg.Block = block.Bytes()
msg.Key = query.Key().Clone()
diff --git a/src/gnunet/service/dht/path/elements.go
b/src/gnunet/service/dht/path/elements.go
index 5e26e57..53d1cfd 100644
--- a/src/gnunet/service/dht/path/elements.go
+++ b/src/gnunet/service/dht/path/elements.go
@@ -38,8 +38,8 @@ var (
//----------------------------------------------------------------------
// Entry is an element of the path list
type Entry struct {
- Signer *util.PeerID // path element signer
Signature *util.PeerSignature // path element signature
+ Signer *util.PeerID // path element signer
}
// Size returns the size of a path element in wire format
diff --git a/src/gnunet/service/dht/path/handling.go
b/src/gnunet/service/dht/path/handling.go
index 412f646..b225c82 100644
--- a/src/gnunet/service/dht/path/handling.go
+++ b/src/gnunet/service/dht/path/handling.go
@@ -35,8 +35,8 @@ import (
// path flags
const (
- PathTruncated = iota
- PathLastHop
+ PathTruncated = 1
+ PathLastHop = 2
)
// Path is the complete list of verified hops a message travelled.
@@ -48,6 +48,7 @@ type Path struct {
Expire util.AbsoluteTime `` // expiration time
TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin
(optional)
NumList uint16 `order:"big"` // number of list
entries
+ SplitPos uint16 `order:"big"` // optional split
position
List []*Entry `size:"NumList"` // list of path entries
LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature
LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id
@@ -72,6 +73,7 @@ func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime)
*Path {
Expire: expire,
TruncOrigin: nil,
NumList: 0,
+ SplitPos: 0,
List: make([]*Entry, 0),
LastSig: nil,
LastHop: nil,
@@ -81,7 +83,7 @@ func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime)
*Path {
// NewPathFromBytes reconstructs a path instance from binary data. The layout
// of the data must match with the layout used in Path.Bytes().
func NewPathFromBytes(buf []byte) (path *Path, err error) {
- if buf == nil || len(buf) == 0 {
+ if len(buf) == 0 {
return
}
path = new(Path)
@@ -116,6 +118,7 @@ func (p *Path) Clone() *Path {
Expire: p.Expire,
TruncOrigin: p.TruncOrigin,
NumList: p.NumList,
+ SplitPos: p.SplitPos,
List: util.Clone(p.List),
LastSig: p.LastSig,
LastHop: p.LastHop,
diff --git a/src/gnunet/service/dht/resulthandler.go
b/src/gnunet/service/dht/resulthandler.go
index faab93e..85895f8 100644
--- a/src/gnunet/service/dht/resulthandler.go
+++ b/src/gnunet/service/dht/resulthandler.go
@@ -22,8 +22,10 @@ import (
"bytes"
"context"
"gnunet/crypto"
+ "gnunet/enums"
"gnunet/message"
"gnunet/service/dht/blocks"
+ "gnunet/service/dht/path"
"gnunet/transport"
"gnunet/util"
"time"
@@ -52,7 +54,10 @@ type ResultHandler interface {
Done() bool
// Key returns the query/store key as string
- Key() string
+ Key() *crypto.HashCode
+
+ // Flags returns the query flags
+ Flags() uint16
// Compare two result handlers
Compare(ResultHandler) int
@@ -61,7 +66,7 @@ type ResultHandler interface {
Merge(ResultHandler) bool
// Handle result message
- Handle(context.Context, *message.DHTP2PResultMsg) bool
+ Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth
*path.Path, sender, local *util.PeerID) bool
}
// Compare return values
@@ -108,8 +113,13 @@ func (t *GenericResultHandler) ID() int {
}
// Key returns the key string
-func (t *GenericResultHandler) Key() string {
- return t.key.String()
+func (t *GenericResultHandler) Key() *crypto.HashCode {
+ return t.key
+}
+
+// Flags returns the query flags
+func (t *GenericResultHandler) Flags() uint16 {
+ return t.flags
}
// Done returns true if the result handler is no longer active.
@@ -176,15 +186,28 @@ func NewForwardResultHandler(msgIn message.Message, rf
blocks.ResultFilter, back
}
// Handle incoming DHT-P2P-RESULT message
-func (t *ForwardResultHandler) Handle(ctx context.Context, msg
*message.DHTP2PResultMsg) bool {
+func (t *ForwardResultHandler) Handle(ctx context.Context, msg
*message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool {
// don't send result if it is filtered out
if !t.Proceed(ctx, msg) {
logger.Printf(logger.DBG, "[dht-task-%d] result filtered out --
already known", t.id)
return false
}
+ // extend path if route is recorded
+ pp := pth.Clone()
+ if msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 {
+ // yes: add path element for remote receivers
+ if rcv := t.resp.Receiver(); rcv != nil {
+ pe := pp.NewElement(sender, local, rcv)
+ pp.Add(pe)
+ }
+ }
+
+ // build updated PUT message
+ msgOut := msg.Update(pp)
+
// send result message back to originator (result forwarding).
logger.Printf(logger.INFO, "[dht-task-%d] sending result back to
originator", t.id)
- if err := t.resp.Send(ctx, msg); err != nil && err !=
transport.ErrEndpMaybeSent {
+ if err := t.resp.Send(ctx, msgOut); err != nil && err !=
transport.ErrEndpMaybeSent {
logger.Printf(logger.ERROR, "[dht-task-%d] sending result back
to originator failed: %s", t.id, err.Error())
return false
}
@@ -200,7 +223,7 @@ func (t *ForwardResultHandler) Compare(h ResultHandler) int
{
return RHC_DIFFER
}
// check for same recipient
- if ht.resp.Receiver() != t.resp.Receiver() {
+ if ht.resp.Receiver().Equals(t.resp.Receiver()) {
logger.Printf(logger.DBG, "[frh] recipients differ: %s -- %s",
ht.resp.Receiver(), t.resp.Receiver())
return RHC_DIFFER
}
@@ -237,7 +260,7 @@ func (t *ForwardResultHandler) Merge(h ResultHandler) bool {
//----------------------------------------------------------------------
// ResultHandlerFcn is the function prototype for custom handlers:
-type ResultHandlerFcn func(context.Context, *message.DHTP2PResultMsg, chan<-
any) bool
+type ResultHandlerFcn func(context.Context, *message.DHTP2PResultMsg,
*path.Path, chan<- any) bool
// DirectResultHandler for local DHT-P2P-GET requests
type DirectResultHandler struct {
@@ -262,7 +285,7 @@ func NewDirectResultHandler(msgIn message.Message, rf
blocks.ResultFilter, hdlr
}
// Handle incoming DHT-P2P-RESULT message
-func (t *DirectResultHandler) Handle(ctx context.Context, msg
*message.DHTP2PResultMsg) bool {
+func (t *DirectResultHandler) Handle(ctx context.Context, msg
*message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool {
// don't send result if it is filtered out
if !t.Proceed(ctx, msg) {
logger.Printf(logger.DBG, "[dht-task-%d] result filtered out --
already known", t.id)
@@ -271,7 +294,7 @@ func (t *DirectResultHandler) Handle(ctx context.Context,
msg *message.DHTP2PRes
// check for correct message type and handler function
if t.hdlr != nil {
logger.Printf(logger.INFO, "[dht-task-%d] handling result
message", t.id)
- return t.hdlr(ctx, msg, t.rc)
+ return t.hdlr(ctx, msg, pth, t.rc)
}
return false
}
@@ -318,7 +341,7 @@ func NewResultHandlerList() *ResultHandlerList {
// Add handler to list
func (t *ResultHandlerList) Add(hdlr ResultHandler) bool {
// get current list of handlers for key
- key := hdlr.Key()
+ key := hdlr.Key().String()
list, ok := t.list.Get(key, 0)
modified := false
if !ok {
diff --git a/src/gnunet/service/dht/routingtable.go
b/src/gnunet/service/dht/routingtable.go
index 98f0819..20bf5fc 100644
--- a/src/gnunet/service/dht/routingtable.go
+++ b/src/gnunet/service/dht/routingtable.go
@@ -25,6 +25,7 @@ import (
"gnunet/config"
"gnunet/crypto"
"gnunet/service/dht/blocks"
+ "gnunet/service/store"
"gnunet/util"
"sync"
"time"
@@ -84,11 +85,7 @@ func (addr *PeerAddress) Equals(p *PeerAddress) bool {
// Distance between two addresses: returns a distance value and a
// bucket index (smaller index = less distant).
func (addr *PeerAddress) Distance(p *PeerAddress) (*math.Int, int) {
- d := make([]byte, 64)
- for i := range d {
- d[i] = addr.Key.Bits[i] ^ p.Key.Bits[i]
- }
- r := math.NewIntFromBytes(d)
+ r := util.Distance(addr.Key.Bits, p.Key.Bits)
return r, 512 - r.BitLen()
}
@@ -371,18 +368,29 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) {
//----------------------------------------------------------------------
-func (rt *RoutingTable) BestHello(addr *PeerAddress, rf blocks.ResultFilter)
(hb *blocks.HelloBlock, dist *math.Int) {
- // iterate over cached HELLOs to find (best) match first
- _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock,
_ int) error {
+// LookupHello returns blocks from the HELLO cache for given query.
+func (rt *RoutingTable) LookupHello(addr *PeerAddress, rf blocks.ResultFilter,
approx bool) (results []*store.DHTResult) {
+ // iterate over cached HELLOs to find matches;
+ // approximate search is limited by distance (max. diff for bucket
index is 16)
+ _ = rt.helloCache.ProcessRange(func(key string, hb *blocks.HelloBlock,
_ int) error {
// check if block is excluded by result filter
- if !rf.Contains(val) {
- // check for better match
- p := NewPeerAddress(val.PeerID)
- d, _ := addr.Distance(p)
- if hb == nil || d.Cmp(dist) < 0 {
- hb = val
- dist = d
+ var result *store.DHTResult
+ if !rf.Contains(hb) {
+ // no: possible result, compute distance
+ p := NewPeerAddress(hb.PeerID)
+ dist, idx := addr.Distance(p)
+ result = &store.DHTResult{
+ Entry: &store.DHTEntry{
+ Blk: hb,
+ },
+ Dist: dist,
+ }
+ // check if we need to add result
+ if (approx && idx < 16) || idx == 0 {
+ results = append(results, result)
}
+ } else {
+ logger.Printf(logger.DBG, "[RT] GET-HELLO: cache block
is filtered")
}
return nil
}, true)
diff --git a/src/gnunet/service/dht/routingtable_test.go
b/src/gnunet/service/dht/routingtable_test.go
index 42e2349..02ddc52 100644
--- a/src/gnunet/service/dht/routingtable_test.go
+++ b/src/gnunet/service/dht/routingtable_test.go
@@ -70,7 +70,7 @@ func TestRT(t *testing.T) {
// helper functions
genRemotePeer := func() *PeerAddress {
d := make([]byte, 32)
- _, _ = rand.Read(d)
+ _, _ = rand.Read(d) //nolint:gosec // good enough for testing
return NewPeerAddress(util.NewPeerID(d))
}
@@ -86,10 +86,10 @@ func TestRT(t *testing.T) {
for i := range tasks {
tasks[i] = new(Entry)
tasks[i].addr = genRemotePeer()
- tasks[i].born = rand.Int63n(EPOCHS)
- tasks[i].ttl = 1000 + rand.Int63n(7000)
- tasks[i].drop = 2000 + rand.Int63n(3000)
- tasks[i].revive = rand.Int63n(2000)
+ tasks[i].born = rand.Int63n(EPOCHS) //nolint:gosec // good
enough for testing
+ tasks[i].ttl = 1000 + rand.Int63n(7000) //nolint:gosec // good
enough for testing
+ tasks[i].drop = 2000 + rand.Int63n(3000) //nolint:gosec // good
enough for testing
+ tasks[i].revive = rand.Int63n(2000) //nolint:gosec // good
enough for testing
tasks[i].online = false
}
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index c431d48..0f4781d 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -200,6 +200,7 @@ func (m *Module) ResolveAbsolute(
// ResolveRelative resolves a relative path (to a given zone) recursively by
// processing simple (PKEY,Label) lookups in sequence and handle intermediate
// GNS record types
+//nolint:gocyclo // life sometimes is complex...
func (m *Module) ResolveRelative(
ctx context.Context,
labels []string,
diff --git a/src/gnunet/service/namecache/module.go
b/src/gnunet/service/namecache/module.go
index 642355b..616c0b5 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -20,6 +20,7 @@ package namecache
import (
"context"
+ "errors"
"gnunet/config"
"gnunet/core"
"gnunet/service"
@@ -39,7 +40,7 @@ import (
type Module struct {
service.ModuleImpl
- cache store.DHTStore // transient block cache
+ cache *store.DHTStore // transient block cache
}
// NewModule creates a new module instance.
@@ -67,13 +68,18 @@ func (m *Module) Import(fcm map[string]any) {
//----------------------------------------------------------------------
-// Get an entry from the cache if available.
+// Get entry from the cache if available.
func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block
*blocks.GNSBlock, err error) {
- var e *store.DHTEntry
- if e, err = m.cache.Get(query); err != nil {
+ var e []*store.DHTEntry
+ rf := blocks.NewGenericResultFilter()
+ if e, err = m.cache.Get("namecache", query, rf); err != nil {
return
}
- err = blocks.Unwrap(e.Blk, block)
+ if len(e) != 1 {
+ err = errors.New("only one DHT entry exppected")
+ } else {
+ err = blocks.Unwrap(e[0].Blk, block)
+ }
return
}
diff --git a/src/gnunet/service/store/store_fs.go
b/src/gnunet/service/store/store_dht.go
similarity index 62%
rename from src/gnunet/service/store/store_fs.go
rename to src/gnunet/service/store/store_dht.go
index bbbe7cc..802cf60 100644
--- a/src/gnunet/service/store/store_fs.go
+++ b/src/gnunet/service/store/store_dht.go
@@ -21,6 +21,7 @@ package store
import (
"encoding/hex"
"fmt"
+ "gnunet/crypto"
"gnunet/service/dht/blocks"
"gnunet/service/dht/path"
"gnunet/util"
@@ -35,9 +36,60 @@ import (
// Filesystem-based storage
//============================================================
-// FileStore implements a filesystem-based storage mechanism for
+//------------------------------------------------------------
+// DHT entry is an entity stored in the DHT
+//------------------------------------------------------------
+
+// DHTEntry to be stored to/retrieved from local storage
+type DHTEntry struct {
+ Blk blocks.Block // reference to DHT block
+ Path *path.Path // associated put path
+}
+
+//------------------------------------------------------------
+// DHT result is a single DHT result
+//------------------------------------------------------------
+
+// Result as returned by local DHT queries
+type DHTResult struct {
+ Entry *DHTEntry // reference to DHT entry
+ Dist *math.Int // distance of entry to query key
+}
+
+//------------------------------------------------------------
+
+type DHTResultSet struct {
+ list []*DHTResult // list of DHT results
+ pos int // iterator position
+}
+
+func NewDHTResultSet() *DHTResultSet {
+ return &DHTResultSet{
+ list: make([]*DHTResult, 0),
+ pos: 0,
+ }
+}
+
+func (rs *DHTResultSet) Add(r *DHTResult) {
+ rs.list = append(rs.list, r)
+}
+
+func (rs *DHTResultSet) Next() (result *DHTResult) {
+ if rs.pos == len(rs.list) {
+ return nil
+ }
+ result = rs.list[rs.pos]
+ rs.pos++
+ return
+}
+
+//------------------------------------------------------------
+// DHT store
+//------------------------------------------------------------
+
+// DHTStore implements a filesystem-based storage mechanism for
// DHT queries and blocks.
-type FileStore struct {
+type DHTStore struct {
path string // storage path
cache bool // storage works as cache
args util.ParameterSet // arguments / settings
@@ -53,10 +105,10 @@ type FileStore struct {
size int // size of cache (number of entries)
}
-// NewFileStore instantiates a new file storage.
-func NewFileStore(spec util.ParameterSet) (DHTStore, error) {
+// NewDHTStore instantiates a new file storage handler.
+func NewDHTStore(spec util.ParameterSet) (*DHTStore, error) {
// create file store handler
- fs := new(FileStore)
+ fs := new(DHTStore)
fs.args = spec
// get parameter
@@ -93,7 +145,7 @@ func NewFileStore(spec util.ParameterSet) (DHTStore, error) {
}
// Close file storage.
-func (s *FileStore) Close() (err error) {
+func (s *DHTStore) Close() (err error) {
if !s.cache {
// close database connection
err = s.meta.Close()
@@ -102,7 +154,7 @@ func (s *FileStore) Close() (err error) {
}
// Put block into storage under given key
-func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) {
+func (s *DHTStore) Put(query blocks.Query, entry *DHTEntry) (err error) {
// check for free space
if !s.cache {
if int(s.totalSize>>30) > s.maxSpace {
@@ -122,9 +174,10 @@ func (s *FileStore) Put(query blocks.Query, entry
*DHTEntry) (err error) {
// compile metadata
now := util.AbsoluteTimeNow()
meta := &FileMetadata{
- key: query.Key().Bits,
+ key: query.Key(),
size: uint64(blkSize),
btype: btype,
+ bhash: crypto.Hash(entry.Blk.Bytes()),
expires: expire,
stored: now,
lastUsed: now,
@@ -146,75 +199,73 @@ func (s *FileStore) Put(query blocks.Query, entry
*DHTEntry) (err error) {
}
// Get block with given key from storage
-func (s *FileStore) Get(query blocks.Query) (entry *DHTEntry, err error) {
+func (s *DHTStore) Get(label string, query blocks.Query, rf
blocks.ResultFilter) (results []*DHTEntry, err error) {
// check if we have metadata for the query
- key := query.Key().Bits
- btype := query.Type()
- var md *FileMetadata
- if md, err = s.meta.Get(key, btype); err != nil || md == nil {
+ var mds []*FileMetadata
+ if mds, err = s.meta.Get(query); err != nil || len(mds) == 0 {
return
}
- // check for expired entry
- if md.expires.Expired() {
- err = s.dropFile(md)
- return
- }
- // mark the block as newly used
- if err = s.meta.Used(key, btype); err != nil {
- return
+ // traverse list of results
+ for _, md := range mds {
+ // check for expired entry
+ if md.expires.Expired() {
+ if err = s.dropFile(md); err != nil {
+ logger.Printf(logger.ERROR, "[%s] can't drop
DHT file: %s", label, err)
+ }
+ continue
+ }
+ // check for filtered block
+ if rf.ContainsHash(md.bhash) {
+ continue
+ }
+ // read entry from storage
+ var entry *DHTEntry
+ if entry, err = s.readEntry(md.key.Bits); err != nil {
+ logger.Printf(logger.ERROR, "[%s] can't read DHT entry:
%s", label, err)
+ continue
+ }
+ results = append(results, entry)
+ // mark the block as newly used
+ if err = s.meta.Used(md.key.Bits, md.btype); err != nil {
+ logger.Printf(logger.ERROR, "[%s] can't flag DHT entry
as used: %s", label, err)
+ continue
+ }
}
- return s.readEntry(key)
+ return
}
// GetApprox returns the best-matching value with given key from storage
// that is not excluded
-func (s *FileStore) GetApprox(query blocks.Query, excl func(*DHTEntry) bool)
(entry *DHTEntry, key any, err error) {
- var bestKey []byte
- var bestEntry *DHTEntry
- var bestDist *math.Int
- // distance function
- dist := func(a, b []byte) *math.Int {
- c := make([]byte, len(a))
- for i := range a {
- c[i] = a[i] ^ b[i]
+func (s *DHTStore) GetApprox(label string, query blocks.Query, rf
blocks.ResultFilter) (results []*DHTResult, err error) {
+ // iterate over all keys; process each metadata instance
+ // (append to results if appropriate)
+ process := func(md *FileMetadata) {
+ // check for filtered block.
+ if rf.ContainsHash(md.bhash) {
+ // filtered out...
+ return
}
- return math.NewIntFromBytes(c)
- }
- // iterate over all keys
- check := func(md *FileMetadata) {
- // check for better match
- d := dist(md.key, query.Key().Bits)
- var entry *DHTEntry
- if bestKey == nil || d.Cmp(bestDist) < 0 {
- // we might have a match. check block for exclusion
- if entry, err = s.readEntry(md.key); err != nil {
- logger.Printf(logger.ERROR, "[dhtstore] failed
to retrieve block for %s", hex.EncodeToString(md.key))
- return
- }
- if excl(entry) {
- return
- }
- // remember best match
- bestKey = md.key
- bestEntry = entry
- bestDist = d
+ // check distance (max. 16 bucktes off)
+ dist := util.Distance(md.key.Bits, query.Key().Bits)
+ if (512 - dist.BitLen()) > 16 {
+ return
}
- }
- if err = s.meta.Traverse(check); err != nil {
- return
- }
- if bestEntry != nil {
- // mark the block as newly used
- if err = s.meta.Used(bestKey, bestEntry.Blk.Type()); err != nil
{
+ // read entry from storage
+ var entry *DHTEntry
+ if entry, err = s.readEntry(md.key.Bits); err != nil {
+ logger.Printf(logger.ERROR, "[%s] failed to retrieve
block for %s", label, md.key.String())
return
}
+ // add to result list
+ result := &DHTResult{
+ Entry: entry,
+ Dist: dist,
+ }
+ results = append(results, result)
}
- return bestEntry, bestDist, nil
-}
-
-// Get a list of all stored block keys (generic query).
-func (s *FileStore) List() ([]blocks.Query, error) {
- return nil, ErrStoreNoList
+ // traverse mestadata database
+ err = s.meta.Traverse(process)
+ return
}
//----------------------------------------------------------------------
@@ -227,7 +278,7 @@ type entryLayout struct {
}
// read entry from storage for given key
-func (s *FileStore) readEntry(key []byte) (entry *DHTEntry, err error) {
+func (s *DHTStore) readEntry(key []byte) (entry *DHTEntry, err error) {
// get path and filename from key
folder, fname := s.expandPath(key)
@@ -255,7 +306,7 @@ func (s *FileStore) readEntry(key []byte) (entry *DHTEntry,
err error) {
}
// write entry to storage for given key
-func (s *FileStore) writeEntry(key []byte, entry *DHTEntry) (err error) {
+func (s *DHTStore) writeEntry(key []byte, entry *DHTEntry) (err error) {
// get folder and filename from key
folder, fname := s.expandPath(key)
// make sure the folder exists
@@ -287,14 +338,14 @@ func (s *FileStore) writeEntry(key []byte, entry
*DHTEntry) (err error) {
//----------------------------------------------------------------------
// expandPath returns the full path to the file for given key.
-func (s *FileStore) expandPath(key []byte) (string, string) {
+func (s *DHTStore) expandPath(key []byte) (string, string) {
h := hex.EncodeToString(key)
return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:]
}
// Prune list of file headers so we drop at least n entries.
// returns number of removed entries.
-func (s *FileStore) prune(n int) (del int) {
+func (s *DHTStore) prune(n int) (del int) {
// collect obsolete records
obsolete, err := s.meta.Obsolete(n)
if err != nil {
@@ -311,16 +362,17 @@ func (s *FileStore) prune(n int) (del int) {
}
// drop file removes a file from metadatabase and the physical storage.
-func (s *FileStore) dropFile(md *FileMetadata) (err error) {
+func (s *DHTStore) dropFile(md *FileMetadata) (err error) {
// adjust total size
s.totalSize -= md.size
// remove from database
- if err = s.meta.Drop(md.key, md.btype); err != nil {
+ if err = s.meta.Drop(md.key.Bits, md.btype); err != nil {
logger.Printf(logger.ERROR, "[store] can't remove metadata
(%s,%d): %s", md.key, md.btype, err.Error())
return
}
// remove from filesystem
- path := fmt.Sprintf("%s/%s/%s/%s", s.path, md.key[:2], md.key[2:4],
md.key[4:])
+ h := hex.EncodeToString(md.key.Bits)
+ path := fmt.Sprintf("%s/%s/%s/%s", s.path, h[:2], h[2:4], h[4:])
if err = os.Remove(path); err != nil {
logger.Printf(logger.ERROR, "[store] can't remove file %s: %s",
path, err.Error())
}
diff --git a/src/gnunet/service/store/store_fs_meta.go
b/src/gnunet/service/store/store_dht_meta.go
similarity index 67%
rename from src/gnunet/service/store/store_fs_meta.go
rename to src/gnunet/service/store/store_dht_meta.go
index 5710286..9ebe387 100644
--- a/src/gnunet/service/store/store_fs_meta.go
+++ b/src/gnunet/service/store/store_dht_meta.go
@@ -21,7 +21,9 @@ package store
import (
"database/sql"
_ "embed"
+ "gnunet/crypto"
"gnunet/enums"
+ "gnunet/service/dht/blocks"
"gnunet/util"
"os"
)
@@ -33,21 +35,30 @@ import (
// FileMetadata holds information about a file (raw block data)
// and is stored in a SQL database for faster access.
type FileMetadata struct {
- key []byte // storage key
+ key *crypto.HashCode // storage key
size uint64 // size of file
btype enums.BlockType // block type
+ bhash *crypto.HashCode // block hash
stored util.AbsoluteTime // time added to store
expires util.AbsoluteTime // expiration time
lastUsed util.AbsoluteTime // time last used
usedCount uint64 // usage count
}
+// NewFileMetadata creates a new file metadata instance
+func NewFileMetadata() *FileMetadata {
+ return &FileMetadata{
+ key: crypto.NewHashCode(nil),
+ bhash: crypto.NewHashCode(nil),
+ }
+}
+
//------------------------------------------------------------
// Metadata database: A SQLite3 database to hold metadata about
// blocks in file storage
//------------------------------------------------------------
-//go:embed store_fs_meta.sql
+//go:embed store_dht_meta.sql
var initScript []byte
// FileMetaDB is a SQLite3 database for block metadata
@@ -86,42 +97,67 @@ func OpenMetaDB(path string) (db *FileMetaDB, err error) {
// Store metadata in database: creates or updates a record for the metadata
// in the database; primary key is the query key
func (db *FileMetaDB) Store(md *FileMetadata) (err error) {
- sql := "replace into
meta(qkey,btype,size,stored,expires,lastUsed,usedCount) values(?,?,?,?,?,?,?)"
- _, err = db.conn.Exec(sql, md.key, md.btype, md.size,
md.stored.Epoch(), md.expires.Epoch(), md.lastUsed.Epoch(), md.usedCount)
+ sql := "replace into
meta(qkey,btype,bhash,size,stored,expires,lastUsed,usedCount)
values(?,?,?,?,?,?,?,?)"
+ _, err = db.conn.Exec(sql,
+ md.key.Bits, md.btype, md.bhash.Bits, md.size,
md.stored.Epoch(),
+ md.expires.Epoch(), md.lastUsed.Epoch(), md.usedCount)
return
}
// Get block metadata from database
-func (db *FileMetaDB) Get(key []byte, btype enums.BlockType) (md
*FileMetadata, err error) {
- md = new(FileMetadata)
- md.key = util.Clone(key)
- md.btype = btype
- stmt := "select size,stored,expires,lastUsed,usedCount from meta where
qkey=? and btype=?"
- row := db.conn.QueryRow(stmt, key, btype)
- var st, exp, lu uint64
- if err = row.Scan(&md.size, &st, &exp, &lu, &md.usedCount); err != nil {
- if err == sql.ErrNoRows {
- md = nil
- err = nil
- }
+func (db *FileMetaDB) Get(query blocks.Query) (mds []*FileMetadata, err error)
{
+ // select rows in database matching the query
+ stmt := "select size,bhash,stored,expires,lastUsed,usedCount from meta
where qkey=?"
+ btype := query.Type()
+ var rows *sql.Rows
+ if btype == enums.BLOCK_TYPE_ANY {
+ rows, err = db.conn.Query(stmt, query.Key().Bits)
} else {
+ rows, err = db.conn.Query(stmt+" and btype=?",
query.Key().Bits, btype)
+ }
+ if err != nil {
+ return
+ }
+ // process results
+ for rows.Next() {
+ md := NewFileMetadata()
+ md.key = query.Key()
+ md.btype = btype
+ var st, exp, lu uint64
+ if err = rows.Scan(&md.size, &md.bhash.Bits, &st, &exp, &lu,
&md.usedCount); err != nil {
+ if err == sql.ErrNoRows {
+ md = nil
+ err = nil
+ }
+ return
+ }
md.stored.Val = st * 1000000
md.expires.Val = exp * 1000000
md.lastUsed.Val = lu * 1000000
+ mds = append(mds, md)
}
return
}
// Drop metadata for block from database
-func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) error {
- _, err := db.conn.Exec("delete from meta where qkey=? and btype=?",
key, btype)
- return err
+func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) (err error) {
+ if btype != enums.BLOCK_TYPE_ANY {
+ _, err = db.conn.Exec("delete from meta where qkey=? and
btype=?", key, btype)
+ } else {
+ _, err = db.conn.Exec("delete from meta where qkey=?", key)
+ }
+ return
}
// Used a block from store: increment usage count and lastUsed time.
-func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) error {
- _, err := db.conn.Exec("update meta set
usedCount=usedCount+1,lastUsed=unixepoch() where qkey=? and btype=?", key,
btype)
- return err
+func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) (err error) {
+ stmt := "update meta set usedCount=usedCount+1,lastUsed=unixepoch()
where qkey=?"
+ if btype != enums.BLOCK_TYPE_ANY {
+ _, err = db.conn.Exec(stmt+" and btype=?", key, btype)
+ } else {
+ _, err = db.conn.Exec(stmt, key)
+ }
+ return
}
// Obsolete collects records from the meta database that are considered
@@ -150,15 +186,15 @@ func (db *FileMetaDB) Obsolete(n int) (removable
[]*FileMetadata, err error) {
// Traverse metadata records and call function on each record
func (db *FileMetaDB) Traverse(f func(*FileMetadata)) error {
- sql := "select qkey,btype,size,stored,expires,lastUsed,usedCount from
meta"
+ sql := "select qkey,btype,bhash,size,stored,expires,lastUsed,usedCount
from meta"
rows, err := db.conn.Query(sql)
if err != nil {
return err
}
- md := new(FileMetadata)
+ md := NewFileMetadata()
for rows.Next() {
var st, exp, lu uint64
- err = rows.Scan(&md.key, &md.btype, &md.size, &st, &exp, &lu,
&md.usedCount)
+ err = rows.Scan(&md.key.Bits, &md.btype, &md.bhash.Bits,
&md.size, &st, &exp, &lu, &md.usedCount)
if err != nil {
return err
}
diff --git a/src/gnunet/service/store/store_fs_meta.sql
b/src/gnunet/service/store/store_dht_meta.sql
similarity index 96%
rename from src/gnunet/service/store/store_fs_meta.sql
rename to src/gnunet/service/store/store_dht_meta.sql
index a2692ab..ba9fe1f 100644
--- a/src/gnunet/service/store/store_fs_meta.sql
+++ b/src/gnunet/service/store/store_dht_meta.sql
@@ -19,6 +19,7 @@
create table meta (
qkey blob, -- key (SHA512 hash)
btype integer, -- block type
+ bhash blob, -- block hash
size integer, -- size of file
stored integer, -- time added to store
expires integer, -- expiration time
diff --git a/src/gnunet/service/store/dhtstore_test.go
b/src/gnunet/service/store/store_dht_test.go
similarity index 83%
rename from src/gnunet/service/store/dhtstore_test.go
rename to src/gnunet/service/store/store_dht_test.go
index ba5fae2..9f80380 100644
--- a/src/gnunet/service/store/dhtstore_test.go
+++ b/src/gnunet/service/store/store_dht_test.go
@@ -52,21 +52,27 @@ func TestDHTFilesStore(t *testing.T) {
// create file store
if _, err := os.Stat(path); err != nil {
- os.MkdirAll(path, 0755)
+ if err = os.MkdirAll(path, 0755); err != nil {
+ t.Fatal(err)
+ }
}
- fs, err := NewFileStore(cfg)
+ fs, err := NewDHTStore(cfg)
if err != nil {
t.Fatal(err)
}
// allocate keys
keys := make([]blocks.Query, 0, fsNumBlocks)
+ // create result filter
+ rf := blocks.NewGenericResultFilter()
// First round: save blocks
for i := 0; i < fsNumBlocks; i++ {
// generate random block
- size := 1024 + rand.Intn(62000)
+ size := 1024 + rand.Intn(62000) //nolint:gosec // good enough
for testing
buf := make([]byte, size)
- rand.Read(buf)
+ if _, err = rand.Read(buf); err != nil { //nolint:gosec // good
enough for testing
+ t.Fatal(err)
+ }
blk := blocks.NewGenericBlock(buf)
// generate associated key
k := crypto.Hash(buf)
@@ -86,11 +92,14 @@ func TestDHTFilesStore(t *testing.T) {
// Second round: retrieve blocks and check
for i, key := range keys {
// get block
- val, err := fs.Get(key)
+ vals, err := fs.Get("test", key, rf)
if err != nil {
t.Fatalf("[%d] %s", i, err)
}
- buf := val.Blk.Bytes()
+ if len(vals) != 1 {
+ t.Fatalf("[%d] only one result expected", i)
+ }
+ buf := vals[0].Blk.Bytes()
// re-create key
k := crypto.Hash(buf)
diff --git a/src/gnunet/service/store/store.go
b/src/gnunet/service/store/store_kv.go
similarity index 73%
rename from src/gnunet/service/store/store.go
rename to src/gnunet/service/store/store_kv.go
index ed55547..8bcb711 100644
--- a/src/gnunet/service/store/store.go
+++ b/src/gnunet/service/store/store_kv.go
@@ -24,8 +24,6 @@ import (
_ "embed" // use embedded filesystem
"errors"
"fmt"
- "gnunet/service/dht/blocks"
- "gnunet/service/dht/path"
"gnunet/util"
redis "github.com/go-redis/redis/v8"
@@ -40,68 +38,24 @@ var (
ErrStoreNoList = fmt.Errorf("no key listing for store defined")
)
-//------------------------------------------------------------
-// Generic storage interface. Can be used for persistent or
-// transient (caching) storage of key/value data.
//------------------------------------------------------------
-// Store is a key/value storage where the type of the key is either
-// a SHA512 hash value or a string and the value is either a DHT
-// block or a string. It is possiblle to mix any key/value types,
-// but not used in this implementation.
-type Store[K, V any] interface {
+// KVStore us a eneric key/value storage interface. Can be used for
+// persistent or transient (caching) storage of stringed key/value data.
+type KVStore interface {
// Put value into storage under given key
- Put(key K, val V) error
+ Put(key string, val string) error
// Get value with given key from storage
- Get(key K) (V, error)
-
- // GetApprox returns the best-matching value with given key from storage
- // that is not excluded.
- GetApprox(key K, excl func(V) bool) (V, any, error)
+ Get(key string) (string, error)
// List all store keys
- List() ([]K, error)
+ List() ([]string, error)
// Close store
Close() error
}
-//------------------------------------------------------------
-// Types for custom store requirements
-//------------------------------------------------------------
-
-// DHTEntry to be stored/retrieved
-type DHTEntry struct {
- Blk blocks.Block
- Path *path.Path
-}
-
-// DHTStore for DHT queries and blocks
-type DHTStore Store[blocks.Query, *DHTEntry]
-
-// KVStore for key/value string pairs
-type KVStore Store[string, string]
-
-//------------------------------------------------------------
-// NewDHTStore creates a new storage handler with given spec
-// for use with DHT queries and blocks
-func NewDHTStore(spec util.ParameterSet) (DHTStore, error) {
- // get the mode parameter
- mode, ok := util.GetParam[string](spec, "mode")
- if !ok {
- return nil, ErrStoreInvalidSpec
- }
- switch mode {
- //------------------------------------------------------------------
- // File-base storage
- //------------------------------------------------------------------
- case "file":
- return NewFileStore(spec)
- }
- return nil, ErrStoreUnknown
-}
-
//------------------------------------------------------------
// NewKVStore creates a new storage handler with given spec
// for use with key/value string pairs.
@@ -178,11 +132,6 @@ func (s *RedisStore) Get(key string) (value string, err
error) {
return s.client.Get(context.TODO(), key).Result()
}
-// GetApprox returns the best-matching value for given key from storage
-func (s *RedisStore) GetApprox(key string, crit func(string) bool) (value
string, vkey any, err error) {
- return "", "", ErrStoreNoApprox
-}
-
// List all keys in store
func (s *RedisStore) List() (keys []string, err error) {
var (
@@ -255,11 +204,6 @@ func (s *SQLStore) Get(key string) (value string, err
error) {
return
}
-// GetApprox returns the best-matching value for given key from storage
-func (s *SQLStore) GetApprox(key string, crit func(string) bool) (value
string, vkey any, err error) {
- return "", "", ErrStoreNoApprox
-}
-
// List all keys in store
func (s *SQLStore) List() (keys []string, err error) {
var (
diff --git a/src/gnunet/transport/responder.go
b/src/gnunet/transport/responder.go
index 7032c78..be958bf 100644
--- a/src/gnunet/transport/responder.go
+++ b/src/gnunet/transport/responder.go
@@ -33,8 +33,9 @@ type Responder interface {
// Handle outgoing message
Send(ctx context.Context, msg message.Message) error
- // Receiver returns the receiving peer (string representation)
- Receiver() string
+ // Receiver returns the receiving peer. Returns nil if
+ // this is a local responder (service.Connection)
+ Receiver() *util.PeerID
}
//----------------------------------------------------------------------
@@ -55,6 +56,6 @@ func (r *TransportResponder) Send(ctx context.Context, msg
message.Message) erro
}
// Receiver returns the receiving peer id
-func (r *TransportResponder) Receiver() string {
- return r.Peer.String()
+func (r *TransportResponder) Receiver() *util.PeerID {
+ return r.Peer
}
diff --git a/src/gnunet/util/map.go b/src/gnunet/util/map.go
index ab48089..2237188 100644
--- a/src/gnunet/util/map.go
+++ b/src/gnunet/util/map.go
@@ -152,7 +152,7 @@ func (m *Map[K, V]) GetRandom(pid int) (key K, value V, ok
bool) {
ok = false
if size := m.Size(); size > 0 {
- idx := rand.Intn(size)
+ idx := rand.Intn(size) //nolint:gosec // good enough for
selection
for key, value = range m.list {
if idx == 0 {
ok = true
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go
index 562bdaa..a157ec4 100644
--- a/src/gnunet/util/misc.go
+++ b/src/gnunet/util/misc.go
@@ -24,6 +24,7 @@ import (
"strings"
"github.com/bfix/gospel/data"
+ "github.com/bfix/gospel/math"
)
//----------------------------------------------------------------------
@@ -76,6 +77,16 @@ func GetParam[V any](params ParameterSet, key string) (i V,
ok bool) {
// additional helpers
//----------------------------------------------------------------------
+// Distance returns the XOR distance between to byte arrays
+func Distance(a, b []byte) *math.Int {
+ size := len(a)
+ d := make([]byte, size)
+ for i := range d {
+ d[i] = a[i] ^ b[i]
+ }
+ return math.NewIntFromBytes(d)
+}
+
// StripPathRight returns a dot-separated path without
// its last (right-most) element.
func StripPathRight(s string) string {
diff --git a/src/gnunet/util/peer.go b/src/gnunet/util/peer.go
index 62ae2e2..bcddad6 100644
--- a/src/gnunet/util/peer.go
+++ b/src/gnunet/util/peer.go
@@ -38,7 +38,7 @@ func NewPeerPublicKey(data []byte) *PeerPublicKey {
pk := new(PeerPublicKey)
size := pk.Size()
v := make([]byte, size)
- if data != nil && len(data) > 0 {
+ if len(data) > 0 {
if uint(len(data)) < size {
CopyAlignedBlock(v, data)
} else {
@@ -68,7 +68,7 @@ func (pk *PeerPublicKey) Verify(data []byte, sig
*PeerSignature) (bool, error) {
// Peer identifier:
//----------------------------------------------------------------------
-// PeerID is a wrpped PeerPublicKey
+// PeerID is a wrapped PeerPublicKey
type PeerID struct {
PeerPublicKey
}
@@ -109,7 +109,7 @@ func NewPeerSignature(data []byte) *PeerSignature {
s := new(PeerSignature)
size := s.Size()
v := make([]byte, size)
- if data != nil && len(data) > 0 {
+ if len(data) > 0 {
if uint(len(data)) < size {
CopyAlignedBlock(v, data)
} else {
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet-go] branch master updated: Milestone 2+3 (NLnet funding),
gnunet <=