[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r30019 - in gnunet-java/src: main/java/org/gnunet/util test
From: |
gnunet |
Subject: |
[GNUnet-SVN] r30019 - in gnunet-java/src: main/java/org/gnunet/util test/java/org/gnunet/util |
Date: |
Wed, 9 Oct 2013 02:39:02 +0200 |
Author: dold
Date: 2013-10-09 02:39:02 +0200 (Wed, 09 Oct 2013)
New Revision: 30019
Modified:
gnunet-java/src/main/java/org/gnunet/util/Client.java
gnunet-java/src/main/java/org/gnunet/util/Connection.java
gnunet-java/src/main/java/org/gnunet/util/Helper.java
gnunet-java/src/main/java/org/gnunet/util/MessageStreamTokenizer.java
gnunet-java/src/main/java/org/gnunet/util/Resolver.java
gnunet-java/src/main/java/org/gnunet/util/Scheduler.java
gnunet-java/src/main/java/org/gnunet/util/Server.java
gnunet-java/src/test/java/org/gnunet/util/ResolverTest.java
Log:
- use MST in Connection instead of custom message tokenization
- remove ReceiveHandle, be closer to the C API
- add oneShot option to MST
- fix problems in MST
Modified: gnunet-java/src/main/java/org/gnunet/util/Client.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Client.java 2013-10-08
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Client.java 2013-10-09
00:39:02 UTC (rev 30019)
@@ -146,8 +146,8 @@
* @param timeout deadline after which MessageReceiver.deadline will be
called
* @param receiver MessageReceiver that is responsible for the received
message
*/
- public Cancelable receiveOne(RelativeTime timeout, MessageReceiver
receiver) {
- return connection.receive(timeout, receiver);
+ public void receiveOne(RelativeTime timeout, MessageReceiver receiver) {
+ connection.receive(timeout, receiver);
}
/**
Modified: gnunet-java/src/main/java/org/gnunet/util/Connection.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Connection.java 2013-10-08
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Connection.java 2013-10-09
00:39:02 UTC (rev 30019)
@@ -21,8 +21,6 @@
package org.gnunet.util;
import org.gnunet.construct.Construct;
-import org.gnunet.construct.MessageLoader;
-import org.gnunet.construct.ProtocolViolationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,18 +64,15 @@
private Cancelable connectHandle = null;
/**
- * The ReceiveHelper responsible for receiving a whole message from the
service
+ * The ReceiveTask responsible for receiving a whole message from the
service
* and calling the respective MessageReceiver.
*/
- private ReceiveHelper currentReceiveHelper = null;
+ private Scheduler.TaskIdentifier receiveTaskId;
- /**
- * The buffer with the (partial) message received from the service.
- * Initially, this buffer has the size of the smallest possible messages,
but grows when
- * receiving larger messages.
- */
- private ByteBuffer recvBuffer =
ByteBuffer.allocate(GnunetMessage.Header.SIZE);
+ private MessageReceiver currentReceiver;
+ private AbsoluteTime receiveDeadline;
+
/**
* The handle for the current transmission. Writes data to the socket.
*/
@@ -91,12 +86,21 @@
private TransmitHelper nextTransmitHelper = null;
/**
+ * Has the last call to the mst produced a message?
+ */
+ private boolean processedMessage;
+
+ /**
* The transmitters passed to transmitReadyNotify(...) write to this
buffer by calling
* methods on the MessageSink passed to the
Transmitter.transmit(MessageSink s) method.
* Initially, this buffer has the size of the smallest possible messages,
but grows when
* transmitting larger messages.
*/
private ByteBuffer transmitBuffer =
ByteBuffer.allocate(GnunetMessage.Header.SIZE);
+
+ /**
+ * Has the connection been disconnected?
+ */
private boolean disconnected = false;
/**
@@ -105,10 +109,14 @@
private Scheduler.TaskIdentifier notifyConnectedTimeout;
/**
- * Continuation to call when connected
+ * Continuation to call when connected.
*/
private Continuation notifyConnectedContinuation;
+ /**
+ * Message stream tokenizer for messages received from the connection.
+ */
+ private MessageStreamTokenizer mst;
/**
* An address probe is a connection to a socket that may succeed or not.
@@ -152,137 +160,60 @@
}
/**
- * The ReceiveHelper is responsible for receiving a whole
+ * The ReceiveTask is responsible for receiving a whole
* GnunetMessage and call the respective MessageReceiver with the message
on success,
* and null on failure or timeout.
*/
- private class ReceiveHelper implements Scheduler.Task {
- private MessageReceiver receiver;
- private RelativeTime timeout;
- private GnunetMessage.Header msgh = null;
- private Scheduler.TaskIdentifier recvTask = null;
- private boolean finished = false;
- // is this receiver actively working? if not, the connection process
has to kick off the receiver
- // (or select behaves badly)
- private boolean working = false;
+ private class ReceiveTask implements Scheduler.Task {
+ /**
+ * The task object's work is over, either because it succeeded at its
job,
+ * or it has been canceled.
+ */
+ public boolean done;
- public ReceiveHelper(MessageReceiver receiver, RelativeTime timeout) {
- this.receiver = receiver;
- this.timeout = timeout;
+ private void error() {
+ currentReceiver.handleError();
+ done = true;
}
- public void dispatchMessage() {
- assert msgh != null;
- currentReceiveHelper = null;
- finished = true;
- recvBuffer.flip();
-
- boolean found = true;
- Class unionClass = null;
-
- try {
- unionClass =
MessageLoader.getUnionClass(GnunetMessage.Body.class, msgh.messageType);
- } catch (ProtocolViolationException e) {
- found = false;
- }
-
- logger.debug("dispatching received message");
- if (found) {
- GnunetMessage msg;
- try {
- msg = Construct.parseAs(recvBuffer, GnunetMessage.class);
- } catch (OutOfMemoryError e) {
- throw new OutOfMemoryError("oom while parsing " +
unionClass);
- }
- receiver.process(msg.body);
- } else {
- UnknownMessageBody b = new UnknownMessageBody();
- b.id = msgh.messageType;
-
- // may throw exception, doesn't matter as it's the last call
- receiver.process(b);
- }
- }
-
@Override
public void run(Scheduler.RunContext ctx) {
- recvTask = null;
+ if (currentReceiver == null) {
+ throw new AssertionError();
+ }
+ receiveTaskId = null;
if (ctx.reasons.contains(Scheduler.Reason.TIMEOUT)) {
- currentReceiveHelper = null;
- receiver.handleError();
+ error();
} else if (ctx.reasons.contains(Scheduler.Reason.READ_READY)) {
+ logger.debug("ready to receive");
try {
- int n = connectionChannel.read(recvBuffer);
- if (n == -1) {
- currentReceiveHelper = null;
- logger.warn("lost connection to service, {}",
connectionChannel.socket().toString());
- connectionChannel.close();
- connectionChannel = null;
- if (Connection.this.currentTransmitHelper != null) {
- Connection.this.currentTransmitHelper.cancel();
- Connection.this.currentTransmitHelper = null;
- }
- try {
- receiver.handleError();
- } finally {
- return;
- }
+ processedMessage = false;
+ int n = mst.readFrom(connectionChannel, true);
+ logger.debug("read {} bytes into mst", n);
+ if (processedMessage) {
+ done = true;
+ return;
}
- logger.debug(String.format("read %s bytes from %s", n,
connectionChannel.socket().toString()));
- } catch (IOException e) {
- logger.error("read failed:", e);
- try {
- receiver.handleError();
- } finally {
+ if (-1 == n) {
+ error();
return;
}
+ } catch (IOException e) {
+ error();
+ return;
}
- if (recvBuffer.remaining() == 0) {
- if (msgh != null) {
- dispatchMessage();
- } else {
- recvBuffer.rewind();
- msgh = Construct.parseAs(recvBuffer,
GnunetMessage.Header.class);
-
- logger.debug("expecting message of size {}, type {}",
msgh.messageSize, msgh.messageType);
- if (msgh.messageSize > GnunetMessage.Header.SIZE) {
- if (recvBuffer.capacity() < msgh.messageSize) {
- ByteBuffer buf =
ByteBuffer.allocate(msgh.messageSize);
- recvBuffer.flip();
- buf.put(recvBuffer);
- recvBuffer = buf;
- }
- recvBuffer.limit(msgh.messageSize);
- schedule();
- } else {
- dispatchMessage();
- }
- }
- } else {
- schedule();
+ if (receiveDeadline.isDue()) {
+ error();
+ return;
}
+ receiveTaskId =
Scheduler.addRead(receiveDeadline.getRemaining(), connectionChannel, this);
} else if (ctx.reasons.contains(Scheduler.Reason.SHUTDOWN)) {
- // nothing to do!
+ done = true;
} else {
// XXX: what to do here?
throw new RuntimeException("receive failed");
}
}
-
- private void schedule() {
- working = true;
- recvTask = Scheduler.addRead(timeout, connectionChannel, this);
- }
-
- public void cancel() {
- if (finished) {
- throw new AssertionError("canceling finished receive");
- }
- if (recvTask != null) {
- recvTask.cancel();
- recvTask = null;
- }
- }
}
@@ -397,6 +328,28 @@
}
}
+ private class ConnectionMstCallback implements MstCalllback {
+ private void dispatch(GnunetMessage.Body mb) {
+ if (processedMessage) {
+ throw new AssertionError();
+ }
+ if (null == currentReceiver) {
+ throw new AssertionError();
+ }
+ currentReceiver.process(mb);
+ processedMessage = true;
+ }
+ @Override
+ public void onUnknownMessage(UnknownMessageBody b) {
+ dispatch(b);
+ }
+
+ @Override
+ public void onKnownMessage(GnunetMessage msg) {
+ dispatch(msg.body);
+ }
+ }
+
/**
* Create a connection to the given hostname/port.
*
@@ -404,12 +357,14 @@
* @param port port of the host to connect to
*/
public Connection(String hostname, int port) {
+ mst = new MessageStreamTokenizer(new ConnectionMstCallback());
addressProbes = new LinkedList<AddressProbe>();
ConnectionResolveHandler addressHandler = new
ConnectionResolveHandler(port);
resolveHandle = Resolver.getInstance().resolveHostname(hostname,
RelativeTime.FOREVER, addressHandler);
}
public Connection(SocketChannel sock) {
+ mst = new MessageStreamTokenizer(new ConnectionMstCallback());
assert sock != null;
this.connectionChannel = sock;
}
@@ -469,6 +424,10 @@
}
+ /**
+ * Actually connect the socket that select reported as ready to connect.
+ * Discards all remaining address probes.
+ */
private void finishConnect(AddressProbe probe) {
// can happen if the addres probe task was already scheduled
if (connectionChannel != null) {
@@ -512,9 +471,6 @@
if (currentTransmitHelper != null) {
currentTransmitHelper.start();
}
- if (currentReceiveHelper != null && !currentReceiveHelper.working) {
- currentReceiveHelper.schedule();
- }
Continuation c = notifyConnectedContinuation;
notifyConnectedContinuation = null;
if (notifyConnectedTimeout != null) {
@@ -544,42 +500,45 @@
return connectionChannel != null && connectionChannel.isConnected();
}
-
- public interface ReceiveHandle extends Cancelable {
- }
-
/**
* Receive one message from the network.
*
* @param timeout deadline after which receiver.onError() will be called
* @param receiver MessageReceiver that is responsible for the received
message
*/
- public ReceiveHandle receive(RelativeTime timeout, final MessageReceiver
receiver) {
- if (currentReceiveHelper != null) {
- throw new AssertionError("receive must not be called while
receiving");
- }
-
+ public void receive(final RelativeTime timeout, final MessageReceiver
receiver) {
+ if (receiveTaskId != null)
+ throw new AssertionError("already receiving");
if (!isConnected()) {
throw new AssertionError("cannot receive if not connected");
}
- recvBuffer.clear();
- recvBuffer.limit(GnunetMessage.Header.SIZE);
- final ReceiveHelper rh = new ReceiveHelper(receiver, timeout);
- currentReceiveHelper = rh;
+ currentReceiver = receiver;
+ receiveDeadline = timeout.toAbsolute();
- // we can only schedule the receive helper if we are sure the
connection is made, otherwise
- // select will misbehave!
- if (connectionChannel.isConnected()) {
- currentReceiveHelper.schedule();
- }
+ // make sure that the receiver is never called directly
+ Scheduler.add(new Scheduler.Task() {
+ @Override
+ public void run(Scheduler.RunContext ctx) {
+ // full message still in buffer?
+ processedMessage = false;
+ if (mst.extractOne()) {
+ logger.debug("full message was in buffer, not reading from
socket");
+ if (!processedMessage) {
+ throw new AssertionError();
+ }
+ return;
+ }
- return new ReceiveHandle() {
- @Override
- public void cancel() {
- rh.cancel();
+ // did we get disconnected in the mean time?
+ if (connectionChannel == null) {
+ return;
+ }
+
+ final ReceiveTask task = new ReceiveTask();
+ receiveTaskId = Scheduler.addRead(timeout, connectionChannel,
task);
}
- };
+ });
}
/**
@@ -626,7 +585,6 @@
};
}
-
/**
* Call cont after establishing the connection or when the timeout has
occured.
*
@@ -662,22 +620,18 @@
logger.error("disconnect called twice");
}
disconnected = true;
-
+ if (receiveTaskId != null) {
+ receiveTaskId.cancel();
+ receiveTaskId = null;
+ }
if (currentTransmitHelper != null) {
currentTransmitHelper.cancel();
currentTransmitHelper = null;
}
-
if (nextTransmitHelper != null) {
nextTransmitHelper.cancel();
nextTransmitHelper = null;
}
-
- if (currentReceiveHelper != null) {
- currentReceiveHelper.cancel();
- currentReceiveHelper = null;
- }
-
if (resolveHandle != null) {
resolveHandle.cancel();
resolveHandle = null;
@@ -694,5 +648,11 @@
}
connectionChannel = null;
}
+
+ if (addressProbes != null) {
+ for (AddressProbe ap : addressProbes) {
+ ap.cancel();
+ }
+ }
}
}
Modified: gnunet-java/src/main/java/org/gnunet/util/Helper.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Helper.java 2013-10-08
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Helper.java 2013-10-09
00:39:02 UTC (rev 30019)
@@ -161,7 +161,13 @@
@Override
public void run(Scheduler.RunContext ctx) {
readTaskId = null;
- int n = mst.readFrom(readThread.pipe.source());
+ int n = 0;
+ try {
+ n = mst.readFrom(readThread.pipe.source(), false);
+ } catch (IOException e) {
+ logger.warn("helper reader got io exception: {}", e);
+ return;
+ }
if (n != -1 && readThread.pipe.source().isOpen()) {
readTaskId = readTaskConfig.schedule();
}
Modified: gnunet-java/src/main/java/org/gnunet/util/MessageStreamTokenizer.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/MessageStreamTokenizer.java
2013-10-08 23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/MessageStreamTokenizer.java
2013-10-09 00:39:02 UTC (rev 30019)
@@ -39,7 +39,7 @@
*/
public class MessageStreamTokenizer {
private static final Logger logger = LoggerFactory
- .getLogger(Service.class);
+ .getLogger(MessageStreamTokenizer.class);
private MstCalllback mstCalllback;
private ByteBuffer buffer;
GnunetMessage.Header msgh;
@@ -49,16 +49,6 @@
this.buffer = ByteBuffer.allocate(4);
}
- public void ensureBufferSize() {
- if (buffer.capacity() < msgh.messageSize) {
- ByteBuffer buf = ByteBuffer.allocate(msgh.messageSize);
- buffer.flip();
- buf.put(buffer);
- buf.flip();
- buffer = buf;
- }
- }
-
public void readAndDispatch() {
Class unionClass = null;
boolean found = true;
@@ -85,38 +75,59 @@
}
}
+
/**
- * Read from a channel into the mst.
+ * Try to extract one message from the MST, call appropriate callbacks.
*
- * @param source channel to read from
- * @return -1 on end of stream, number of bytes read otherwise
+ * @return true if message could be extracted, false if not enough data is
available
*/
- public int readFrom(ReadableByteChannel source) {
- int n;
- try {
- n = source.read(buffer);
- } catch (ClosedChannelException e) {
- return -1;
- }catch (IOException e) {
- throw new IOError(e);
- }
+ public boolean extractOne() {
+ System.out.println("trying to extract message from buffer");
if (msgh == null && buffer.position() >= 4) {
- logger.debug("got header in mst");
- // remember write position and prepare for reading
- int writePos = buffer.position();
+ // prepare for reading
buffer.flip();
msgh = Construct.parseAs(buffer, GnunetMessage.Header.class);
- ensureBufferSize();
- // prepare for writing again, and restore write position
+ // undo read
buffer.position(0);
- buffer.compact();
+ logger.debug("got header in mst, (" + buffer.limit() + "/" +
msgh.messageSize + " read)");
+ if (buffer.capacity() < msgh.messageSize) {
+ ByteBuffer newBuf = ByteBuffer.allocate(msgh.messageSize);
+ newBuf.put(buffer);
+ buffer = newBuf;
+ } else {
+ // set pos to limit and limit to capacity and
+ buffer.compact();
+ }
+ logger.debug("buffer pos is now " + buffer.position());
}
if (msgh != null && buffer.position() >= msgh.messageSize) {
buffer.flip();
readAndDispatch();
msgh = null;
buffer.compact();
+ return true;
}
+ return false;
+ }
+
+
+ /**
+ * Read from a channel into the mst. Does not call any callbacks.
+ *
+ * @param source channel to read from
+ * @return -1 on end of stream, number of bytes read otherwise
+ */
+ public int readFrom(ReadableByteChannel source, boolean oneShot) throws
IOException {
+ int n;
+ n = source.read(buffer);
+ if (oneShot) {
+ extractOne();
+ }
+ else {
+ while (extractOne()) {
+ // loop
+ }
+ }
return n;
}
}
Modified: gnunet-java/src/main/java/org/gnunet/util/Resolver.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Resolver.java 2013-10-08
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Resolver.java 2013-10-09
00:39:02 UTC (rev 30019)
@@ -167,7 +167,6 @@
private boolean finished = false;
private boolean canceled = false;
private Cancelable transmitTask = null;
- private Cancelable receiveTask = null;
public void cancel() {
if (finished) {
@@ -179,9 +178,6 @@
if (queuedRequests.contains(this)) {
queuedRequests.remove(this);
} else {
- if (receiveTask != null) {
- receiveTask.cancel();
- }
if (transmitTask != null) {
transmitTask.cancel();
}
@@ -278,10 +274,9 @@
rh.transmitTask = null;
logger.debug("recv in notifyTransmitReady cb");
- rh.receiveTask = client.receiveOne(deadline.getRemaining(),
new MessageReceiver() {
+ client.receiveOne(deadline.getRemaining(), new
MessageReceiver() {
@Override
public void process(GnunetMessage.Body msg) {
- rh.receiveTask = null;
ResolverResponse gmsg = (ResolverResponse) msg;
if (gmsg.responseBody != null) {
try {
@@ -294,7 +289,7 @@
}
rh.cb.onAddress(in_addr);
- rh.receiveTask =
client.receiveOne(deadline.getRemaining(), this);
+ client.receiveOne(deadline.getRemaining(),
this);
} catch (UnknownHostException e) {
throw new
ProtocolViolationException("malformed address");
}
Modified: gnunet-java/src/main/java/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Scheduler.java 2013-10-08
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Scheduler.java 2013-10-09
00:39:02 UTC (rev 30019)
@@ -328,6 +328,9 @@
}
public void addSelectEvent(SelectableChannel channel, int event) {
+ if (channel == null) {
+ throw new AssertionError("channel may not be null");
+ }
if (subscriptions == null)
subscriptions = new Subscriptions();
subscriptions.add(channel, event);
Modified: gnunet-java/src/main/java/org/gnunet/util/Server.java
===================================================================
--- gnunet-java/src/main/java/org/gnunet/util/Server.java 2013-10-08
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/main/java/org/gnunet/util/Server.java 2013-10-09
00:39:02 UTC (rev 30019)
@@ -131,11 +131,6 @@
private int referenceCount = 0;
/**
- * Handle for canceling the receive process of this client, null if no
receive is currently going on.
- */
- private Cancelable currentReceive;
-
- /**
* Set to true if the connection to this client should not prevent the
server from shutting down.
*/
private boolean isMonitor;
@@ -209,14 +204,10 @@
* @param stayConnected false if connection to the client should be
closed
*/
public void receiveDone(boolean stayConnected) {
- if (currentReceive != null) {
- throw new AssertionError("receiveDone() called, but still
waiting for message");
- }
if (stayConnected) {
- currentReceive = connection.receive(RelativeTime.FOREVER, new
MessageReceiver() {
+ connection.receive(RelativeTime.FOREVER, new MessageReceiver()
{
@Override
public void process(GnunetMessage.Body msg) {
- currentReceive = null;
if ((msg instanceof UnknownMessageBody) ||
!expectedMessages.contains(msg.getClass())) {
if (requireFound) {
logger.info("disconnecting client sending
unknown message");
Modified: gnunet-java/src/test/java/org/gnunet/util/ResolverTest.java
===================================================================
--- gnunet-java/src/test/java/org/gnunet/util/ResolverTest.java 2013-10-08
23:15:28 UTC (rev 30018)
+++ gnunet-java/src/test/java/org/gnunet/util/ResolverTest.java 2013-10-09
00:39:02 UTC (rev 30019)
@@ -40,6 +40,7 @@
.getLogger(ResolverTest.class);
@Test
public void test_resolver() {
+ Program.configureLogging("DEBUG");
final Wrapper<Boolean> finished1 = new Wrapper<Boolean>(true);
final Wrapper<Boolean> finished2 = new Wrapper<Boolean>(true);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r30019 - in gnunet-java/src: main/java/org/gnunet/util test/java/org/gnunet/util,
gnunet <=